{GPL, other}: Relicense to MIT license.
[collectd.git] / src / dbi.c
index 484c604..7fec73c 100644 (file)
--- a/src/dbi.c
+++ b/src/dbi.c
@@ -1,76 +1,79 @@
 /**
  * collectd - src/dbi.c
- * Copyright (C) 2008  Florian octo Forster
+ * Copyright (C) 2008-2013  Florian octo Forster
  *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; only version 2 of the License is applicable.
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
  *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * General Public License for more details.
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
  *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
  *
  * Authors:
- *   Florian octo Forster <octo at verplant.org>
+ *   Florian octo Forster <octo at collectd.org>
  **/
 
 #include "collectd.h"
 #include "common.h"
 #include "plugin.h"
 #include "configfile.h"
+#include "utils_db_query.h"
 
 #include <dbi/dbi.h>
 
 /*
  * Data types
  */
-struct cdbi_driver_option_s
+struct cdbi_driver_option_s /* {{{ */
 {
   char *key;
-  char *value;
-};
-typedef struct cdbi_driver_option_s cdbi_driver_option_t;
-
-struct cdbi_query_s
-{
-  char    *name;
-  char    *statement;
-  char    *type;
-  char   **instances;
-  size_t   instances_num;
-  char   **values;
-  size_t   values_num;
+  union
+  {
+    char *string;
+    int numeric;
+  } value;
+  _Bool is_numeric;
 };
-typedef struct cdbi_query_s cdbi_query_t;
+typedef struct cdbi_driver_option_s cdbi_driver_option_t; /* }}} */
 
-struct cdbi_database_s
+struct cdbi_database_s /* {{{ */
 {
   char *name;
   char *select_db;
 
   char *driver;
+  char *host;
   cdbi_driver_option_t *driver_options;
   size_t driver_options_num;
 
-  cdbi_query_t **queries;
-  size_t      queries_num;
+  udb_query_preparation_area_t **q_prep_areas;
+  udb_query_t **queries;
+  size_t        queries_num;
 
   dbi_conn connection;
 };
-typedef struct cdbi_database_s cdbi_database_t;
+typedef struct cdbi_database_s cdbi_database_t; /* }}} */
 
 /*
  * Global variables
  */
-static cdbi_query_t    **queries       = NULL;
-static size_t         queries_num   = 0;
+static udb_query_t     **queries       = NULL;
+static size_t            queries_num   = 0;
 static cdbi_database_t **databases     = NULL;
-static size_t         databases_num = 0;
+static size_t            databases_num = 0;
+
+static int cdbi_read_database (user_data_t *ud);
 
 /*
  * Functions
@@ -99,19 +102,9 @@ static const char *cdbi_strerror (dbi_conn conn, /* {{{ */
 } /* }}} const char *cdbi_conn_error */
 
 static int cdbi_result_get_field (dbi_result res, /* {{{ */
-    const char *name, int dst_type, value_t *ret_value)
+    unsigned int index, char *buffer, size_t buffer_size)
 {
-  value_t value;
-  unsigned int index;
   unsigned short src_type;
-  dbi_conn connection;
-
-  index = dbi_result_get_field_idx (res, name);
-  if (index < 1)
-  {
-    ERROR ("dbi plugin: cdbi_result_get: No such column: %s.", name);
-    return (-1);
-  }
 
   src_type = dbi_result_get_field_type_idx (res, index);
   if (src_type == DBI_TYPE_ERROR)
@@ -121,87 +114,51 @@ static int cdbi_result_get_field (dbi_result res, /* {{{ */
     return (-1);
   }
 
-  if ((dst_type != DS_TYPE_COUNTER) && (dst_type != DS_TYPE_GAUGE))
-  {
-    ERROR ("dbi plugin: cdbi_result_get: Don't know how to handle "
-        "destination type %i.", dst_type);
-    return (-1);
-  }
-
   if (src_type == DBI_TYPE_INTEGER)
   {
-    if (dst_type == DS_TYPE_COUNTER)
-      value.counter = dbi_result_get_ulonglong_idx (res, index);
-    else
-      value.gauge = (gauge_t) dbi_result_get_longlong_idx (res, index);
+    long long value;
+
+    value = dbi_result_get_longlong_idx (res, index);
+    ssnprintf (buffer, buffer_size, "%lli", value);
   }
   else if (src_type == DBI_TYPE_DECIMAL)
   {
-    value.gauge = dbi_result_get_double_idx (res, index);
-    if (dst_type == DS_TYPE_COUNTER)
-      value.counter = (counter_t) round (value.gauge);
+    double value;
+
+    value = dbi_result_get_double_idx (res, index);
+    ssnprintf (buffer, buffer_size, "%63.15g", value);
   }
   else if (src_type == DBI_TYPE_STRING)
   {
-    const char *string = dbi_result_get_string_idx (res, index);
-    char *endptr = NULL;
-
-    if (string == NULL)
-      value.gauge = NAN;
-    else if (dst_type == DS_TYPE_COUNTER)
-      value.counter = (counter_t) strtoll (string, &endptr, 0);
-    else
-      value.gauge = (gauge_t) strtod (string, &endptr);
-
-    if (string == endptr)
-    {
-      ERROR ("dbi plugin: cdbi_result_get: Can't parse string as number: %s.",
-          string);
+    const char *value;
+    
+    value = dbi_result_get_string_idx (res, index);
+    if (value == NULL)
+      sstrncpy (buffer, "", buffer_size);
+    else if (strcmp ("ERROR", value) == 0)
       return (-1);
-    }
+    else
+      sstrncpy (buffer, value, buffer_size);
   }
+  /* DBI_TYPE_BINARY */
+  /* DBI_TYPE_DATETIME */
   else
   {
-    ERROR ("dbi plugin: cdbi_result_get: Don't know how to handle "
-        "source type %hu.", src_type);
-    return (-1);
-  }
+    const char *field_name;
 
-  connection = dbi_result_get_conn (res);
-  if (dbi_conn_error_flag (connection) != 0)
-  {
-    char errbuf[1024];
-    ERROR ("dbi plugin: cdbi_result_get: dbi_result_get_*_idx failed: %s.",
-        cdbi_strerror (connection, errbuf, sizeof (errbuf)));
+    field_name = dbi_result_get_field_name (res, index);
+    if (field_name == NULL)
+      field_name = "<unknown>";
+
+    ERROR ("dbi plugin: Column `%s': Don't know how to handle "
+        "source type %hu.",
+        field_name, src_type);
     return (-1);
   }
 
-  *ret_value = value;
   return (0);
 } /* }}} int cdbi_result_get_field */
 
-static void cdbi_query_free (cdbi_query_t *q) /* {{{ */
-{
-  size_t i;
-
-  if (q == NULL)
-    return;
-
-  sfree (q->name);
-  sfree (q->statement);
-  sfree (q->type);
-
-  for (i = 0; i < q->instances_num; i++)
-    sfree (q->instances[i]);
-  sfree (q->instances);
-
-  for (i = 0; i < q->values_num; i++)
-    sfree (q->values[i]);
-  sfree (q->values);
-
-  sfree (q);
-} /* }}} void cdbi_query_free */
-
 static void cdbi_database_free (cdbi_database_t *db) /* {{{ */
 {
   size_t i;
@@ -215,40 +172,30 @@ static void cdbi_database_free (cdbi_database_t *db) /* {{{ */
   for (i = 0; i < db->driver_options_num; i++)
   {
     sfree (db->driver_options[i].key);
-    sfree (db->driver_options[i].value);
+    if (!db->driver_options[i].is_numeric)
+      sfree (db->driver_options[i].value.string);
   }
   sfree (db->driver_options);
 
+  if (db->q_prep_areas)
+    for (i = 0; i < db->queries_num; ++i)
+      udb_query_delete_preparation_area (db->q_prep_areas[i]);
+  free (db->q_prep_areas);
+
   sfree (db);
 } /* }}} void cdbi_database_free */
 
-static void cdbi_submit (cdbi_database_t *db, cdbi_query_t *q, /* {{{ */
-    char **instances, value_t *values)
-{
-  value_list_t vl = VALUE_LIST_INIT;
-
-  vl.values = values;
-  vl.values_len = (int) q->values_num;
-  vl.time = time (NULL);
-  sstrncpy (vl.host, hostname_g, sizeof (vl.host));
-  sstrncpy (vl.plugin, "dbi", sizeof (vl.plugin));
-  sstrncpy (vl.plugin_instance, db->name, sizeof (vl.type_instance));
-  sstrncpy (vl.type, q->type, sizeof (vl.type));
-  strjoin (vl.type_instance, sizeof (vl.type_instance),
-      instances, q->instances_num, "-");
-  vl.type_instance[sizeof (vl.type_instance) - 1] = 0;
-
-  plugin_dispatch_values (&vl);
-} /* }}} void cdbi_submit */
-
 /* Configuration handling functions {{{
  *
  * <Plugin dbi>
  *   <Query "plugin_instance0">
  *     Statement "SELECT name, value FROM table"
- *     Type "gauge"
- *     InstancesFrom "name"
- *     ValuesFrom "value"
+ *     <Result>
+ *       Type "gauge"
+ *       InstancesFrom "name"
+ *       ValuesFrom "value"
+ *     </Result>
+ *     ...
  *   </Query>
  *     
  *   <Database "plugin_instance1">
@@ -260,192 +207,6 @@ static void cdbi_submit (cdbi_database_t *db, cdbi_query_t *q, /* {{{ */
  * </Plugin>
  */
 
-static int cdbi_config_set_string (char **ret_string, /* {{{ */
-    oconfig_item_t *ci)
-{
-  char *string;
-
-  if ((ci->values_num != 1)
-      || (ci->values[0].type != OCONFIG_TYPE_STRING))
-  {
-    WARNING ("dbi plugin: The `%s' config option "
-        "needs exactly one string argument.", ci->key);
-    return (-1);
-  }
-
-  string = strdup (ci->values[0].value.string);
-  if (string == NULL)
-  {
-    ERROR ("dbi plugin: strdup failed.");
-    return (-1);
-  }
-
-  if (*ret_string != NULL)
-    free (*ret_string);
-  *ret_string = string;
-
-  return (0);
-} /* }}} int cdbi_config_set_string */
-
-static int cdbi_config_add_string (char ***ret_array, /* {{{ */
-    size_t *ret_array_len, oconfig_item_t *ci)
-{
-  char **array;
-  size_t array_len;
-  int i;
-
-  if (ci->values_num < 1)
-  {
-    WARNING ("dbi plugin: The `%s' config option "
-        "needs at least one argument.", ci->key);
-    return (-1);
-  }
-
-  for (i = 0; i < ci->values_num; i++)
-  {
-    if (ci->values[i].type != OCONFIG_TYPE_STRING)
-    {
-      WARNING ("dbi plugin: Argument %i to the `%s' option "
-          "is not a string.", i + 1, ci->key);
-      return (-1);
-    }
-  }
-
-  array_len = *ret_array_len;
-  array = (char **) realloc (*ret_array,
-      sizeof (char *) * (array_len + ci->values_num));
-  if (array == NULL)
-  {
-    ERROR ("dbi plugin: realloc failed.");
-    return (-1);
-  }
-  *ret_array = array;
-
-  for (i = 0; i < ci->values_num; i++)
-  {
-    array[array_len] = strdup (ci->values[i].value.string);
-    if (array[array_len] == NULL)
-    {
-      ERROR ("dbi plugin: strdup failed.");
-      *ret_array_len = array_len;
-      return (-1);
-    }
-    array_len++;
-  }
-
-  *ret_array_len = array_len;
-  return (0);
-} /* }}} int cdbi_config_add_string */
-
-static int cdbi_config_add_query (oconfig_item_t *ci) /* {{{ */
-{
-  cdbi_query_t *q;
-  int status;
-  int i;
-
-  if ((ci->values_num != 1)
-      || (ci->values[0].type != OCONFIG_TYPE_STRING))
-  {
-    WARNING ("dbi plugin: The `Query' block "
-        "needs exactly one string argument.");
-    return (-1);
-  }
-
-  q = (cdbi_query_t *) malloc (sizeof (*q));
-  if (q == NULL)
-  {
-    ERROR ("dbi plugin: malloc failed.");
-    return (-1);
-  }
-  memset (q, 0, sizeof (*q));
-
-  status = cdbi_config_set_string (&q->name, ci);
-  if (status != 0)
-  {
-    sfree (q);
-    return (status);
-  }
-
-  /* Fill the `cdbi_query_t' structure.. */
-  for (i = 0; i < ci->children_num; i++)
-  {
-    oconfig_item_t *child = ci->children + i;
-
-    if (strcasecmp ("Statement", child->key) == 0)
-      status = cdbi_config_set_string (&q->statement, child);
-    else if (strcasecmp ("Type", child->key) == 0)
-      status = cdbi_config_set_string (&q->type, child);
-    else if (strcasecmp ("InstancesFrom", child->key) == 0)
-      status = cdbi_config_add_string (&q->instances, &q->instances_num, child);
-    else if (strcasecmp ("ValuesFrom", child->key) == 0)
-      status = cdbi_config_add_string (&q->values, &q->values_num, child);
-    else
-    {
-      WARNING ("dbi plugin: Option `%s' not allowed here.", child->key);
-      status = -1;
-    }
-
-    if (status != 0)
-      break;
-  }
-
-  /* Check that all necessary options have been given. */
-  while (status == 0)
-  {
-    if (q->statement == NULL)
-    {
-      WARNING ("dbi plugin: `Statement' not given for query `%s'", q->name);
-      status = -1;
-    }
-    if (q->type == NULL)
-    {
-      WARNING ("dbi plugin: `Type' not given for query `%s'", q->name);
-      status = -1;
-    }
-    if (q->instances == NULL)
-    {
-      WARNING ("dbi plugin: `InstancesFrom' not given for query `%s'", q->name);
-      status = -1;
-    }
-    if (q->values == NULL)
-    {
-      WARNING ("dbi plugin: `ValuesFrom' not given for query `%s'", q->name);
-      status = -1;
-    }
-
-    break;
-  } /* while (status == 0) */
-
-  /* If all went well, add this query to the list of queries within the
-   * database structure. */
-  if (status == 0)
-  {
-    cdbi_query_t **temp;
-
-    temp = (cdbi_query_t **) realloc (queries,
-        sizeof (*queries) * (queries_num + 1));
-    if (temp == NULL)
-    {
-      ERROR ("dbi plugin: realloc failed");
-      status = -1;
-    }
-    else
-    {
-      queries = temp;
-      queries[queries_num] = q;
-      queries_num++;
-    }
-  }
-
-  if (status != 0)
-  {
-    cdbi_query_free (q);
-    return (-1);
-  }
-
-  return (0);
-} /* }}} int cdbi_config_add_query */
-
 static int cdbi_config_add_database_driver_option (cdbi_database_t *db, /* {{{ */
     oconfig_item_t *ci)
 {
@@ -453,10 +214,11 @@ static int cdbi_config_add_database_driver_option (cdbi_database_t *db, /* {{{ *
 
   if ((ci->values_num != 2)
       || (ci->values[0].type != OCONFIG_TYPE_STRING)
-      || (ci->values[1].type != OCONFIG_TYPE_STRING))
+      || ((ci->values[1].type != OCONFIG_TYPE_STRING)
+        && (ci->values[1].type != OCONFIG_TYPE_NUMBER)))
   {
     WARNING ("dbi plugin: The `DriverOption' config option "
-        "needs exactly two string arguments.");
+        "needs exactly two arguments.");
     return (-1);
   }
 
@@ -470,6 +232,7 @@ static int cdbi_config_add_database_driver_option (cdbi_database_t *db, /* {{{ *
 
   db->driver_options = option;
   option = db->driver_options + db->driver_options_num;
+  memset (option, 0, sizeof (*option));
 
   option->key = strdup (ci->values[0].value.string);
   if (option->key == NULL)
@@ -478,69 +241,26 @@ static int cdbi_config_add_database_driver_option (cdbi_database_t *db, /* {{{ *
     return (-1);
   }
 
-  option->value = strdup (ci->values[1].value.string);
-  if (option->value == NULL)
+  if (ci->values[1].type == OCONFIG_TYPE_STRING)
   {
-    ERROR ("dbi plugin: strdup failed.");
-    sfree (option->key);
-    return (-1);
-  }
-
-  db->driver_options_num++;
-  return (0);
-} /* }}} int cdbi_config_add_database_driver_option */
-
-static int cdbi_config_add_database_query (cdbi_database_t *db, /* {{{ */
-    oconfig_item_t *ci)
-{
-  cdbi_query_t *q;
-  cdbi_query_t **temp;
-  size_t i;
-
-  if ((ci->values_num != 1)
-      || (ci->values[0].type != OCONFIG_TYPE_STRING))
-  {
-    WARNING ("dbi plugin: The `Query' config option "
-        "needs exactly one string argument.");
-    return (-1);
-  }
-
-  q = NULL;
-  for (i = 0; i < queries_num; i++)
-  {
-    if (strcasecmp (queries[i]->name, ci->values[0].value.string) == 0)
+    option->value.string = strdup (ci->values[1].value.string);
+    if (option->value.string == NULL)
     {
-      q = queries[i];
-      break;
+      ERROR ("dbi plugin: strdup failed.");
+      sfree (option->key);
+      return (-1);
     }
   }
-
-  if (q == NULL)
-  {
-    WARNING ("dbi plugin: Database `%s': Unknown query `%s'. "
-        "Please make sure that the <Query \"%s\"> block comes before "
-        "the <Database \"%s\"> block.",
-        db->name, ci->values[0].value.string,
-        ci->values[0].value.string, db->name);
-    return (-1);
-  }
-
-  temp = (cdbi_query_t **) realloc (db->queries,
-      sizeof (*db->queries) * (db->queries_num + 1));
-  if (temp == NULL)
-  {
-    ERROR ("dbi plugin: realloc failed");
-    return (-1);
-  }
   else
   {
-    db->queries = temp;
-    db->queries[db->queries_num] = q;
-    db->queries_num++;
+    assert (ci->values[1].type == OCONFIG_TYPE_NUMBER);
+    option->value.numeric = (int) (ci->values[1].value.number + .5);
+    option->is_numeric = 1;
   }
 
+  db->driver_options_num++;
   return (0);
-} /* }}} int cdbi_config_add_database_query */
+} /* }}} int cdbi_config_add_database_driver_option */
 
 static int cdbi_config_add_database (oconfig_item_t *ci) /* {{{ */
 {
@@ -564,7 +284,7 @@ static int cdbi_config_add_database (oconfig_item_t *ci) /* {{{ */
   }
   memset (db, 0, sizeof (*db));
 
-  status = cdbi_config_set_string (&db->name, ci);
+  status = cf_util_get_string (ci, &db->name);
   if (status != 0)
   {
     sfree (db);
@@ -577,13 +297,16 @@ static int cdbi_config_add_database (oconfig_item_t *ci) /* {{{ */
     oconfig_item_t *child = ci->children + i;
 
     if (strcasecmp ("Driver", child->key) == 0)
-      status = cdbi_config_set_string (&db->driver, child);
+      status = cf_util_get_string (child, &db->driver);
     else if (strcasecmp ("DriverOption", child->key) == 0)
       status = cdbi_config_add_database_driver_option (db, child);
     else if (strcasecmp ("SelectDB", child->key) == 0)
-      status = cdbi_config_set_string (&db->select_db, child);
+      status = cf_util_get_string (child, &db->select_db);
     else if (strcasecmp ("Query", child->key) == 0)
-      status = cdbi_config_add_database_query (db, child);
+      status = udb_query_pick_from_list (child, queries, queries_num,
+          &db->queries, &db->queries_num);
+    else if (strcasecmp ("Host", child->key) == 0)
+      status = cf_util_get_string (child, &db->host);
     else
     {
       WARNING ("dbi plugin: Option `%s' not allowed here.", child->key);
@@ -611,6 +334,34 @@ static int cdbi_config_add_database (oconfig_item_t *ci) /* {{{ */
     break;
   } /* while (status == 0) */
 
+  while ((status == 0) && (db->queries_num > 0))
+  {
+    db->q_prep_areas = (udb_query_preparation_area_t **) calloc (
+        db->queries_num, sizeof (*db->q_prep_areas));
+
+    if (db->q_prep_areas == NULL)
+    {
+      WARNING ("dbi plugin: malloc failed");
+      status = -1;
+      break;
+    }
+
+    for (i = 0; i < db->queries_num; ++i)
+    {
+      db->q_prep_areas[i]
+        = udb_query_allocate_preparation_area (db->queries[i]);
+
+      if (db->q_prep_areas[i] == NULL)
+      {
+        WARNING ("dbi plugin: udb_query_allocate_preparation_area failed");
+        status = -1;
+        break;
+      }
+    }
+
+    break;
+  }
+
   /* If all went well, add this database to the global list of databases. */
   if (status == 0)
   {
@@ -625,9 +376,24 @@ static int cdbi_config_add_database (oconfig_item_t *ci) /* {{{ */
     }
     else
     {
+      user_data_t ud;
+      char *name = NULL;
+
       databases = temp;
       databases[databases_num] = db;
       databases_num++;
+
+      memset (&ud, 0, sizeof (ud));
+      ud.data = (void *) db;
+      ud.free_func = NULL;
+      name = ssnprintf_alloc("dbi:%s", db->name);
+
+      plugin_register_complex_read (/* group = */ NULL,
+          /* name = */ name ? name : db->name,
+          /* callback = */ cdbi_read_database,
+          /* interval = */ NULL,
+          /* user_data = */ &ud);
+      free (name);
     }
   }
 
@@ -648,12 +414,13 @@ static int cdbi_config (oconfig_item_t *ci) /* {{{ */
   {
     oconfig_item_t *child = ci->children + i;
     if (strcasecmp ("Query", child->key) == 0)
-      cdbi_config_add_query (child);
+      udb_query_create (&queries, &queries_num, child,
+          /* callback = */ NULL);
     else if (strcasecmp ("Database", child->key) == 0)
       cdbi_config_add_database (child);
     else
     {
-      WARNING ("snmp plugin: Ignoring unknown config option `%s'.", child->key);
+      WARNING ("dbi plugin: Ignoring unknown config option `%s'.", child->key);
     }
   } /* for (ci->children) */
 
@@ -704,158 +471,186 @@ static int cdbi_init (void) /* {{{ */
 } /* }}} int cdbi_init */
 
 static int cdbi_read_database_query (cdbi_database_t *db, /* {{{ */
-    cdbi_query_t *q)
+    udb_query_t *q, udb_query_preparation_area_t *prep_area)
 {
+  const char *statement;
   dbi_result res;
-  char **instances;
-  value_t *values;
-  const data_set_t *ds;
-  size_t i;
+  size_t column_num;
+  char **column_names;
+  char **column_values;
   int status;
-
-  res = NULL;
-  instances = NULL;
-  values = NULL;
+  size_t i;
 
   /* Macro that cleans up dynamically allocated memory and returns the
    * specified status. */
 #define BAIL_OUT(status) \
+  if (column_names != NULL) { sfree (column_names[0]); sfree (column_names); } \
+  if (column_values != NULL) { sfree (column_values[0]); sfree (column_values); } \
   if (res != NULL) { dbi_result_free (res); res = NULL; } \
-  if (instances != NULL) { sfree (instances[0]); sfree (instances); } \
-  sfree (values); \
   return (status)
 
-  ds = plugin_get_ds (q->type);
-  if (ds == NULL)
+  column_names = NULL;
+  column_values = NULL;
+  res = NULL;
+
+  statement = udb_query_get_statement (q);
+  assert (statement != NULL);
+
+  res = dbi_conn_query (db->connection, statement);
+  if (res == NULL)
   {
-    ERROR ("dbi plugin: cdbi_read_database_query: Query `%s': Type `%s' is not "
-        "known by the daemon. See types.db(5) for details.",
-        q->name, q->type);
+    char errbuf[1024];
+    ERROR ("dbi plugin: cdbi_read_database_query (%s, %s): "
+        "dbi_conn_query failed: %s",
+        db->name, udb_query_get_name (q),
+        cdbi_strerror (db->connection, errbuf, sizeof (errbuf)));
     BAIL_OUT (-1);
   }
+  else /* Get the number of columns */
+  {
+    unsigned int db_status;
+
+    db_status = dbi_result_get_numfields (res);
+    if (db_status == DBI_FIELD_ERROR)
+    {
+      char errbuf[1024];
+      ERROR ("dbi plugin: cdbi_read_database_query (%s, %s): "
+          "dbi_result_get_numfields failed: %s",
+          db->name, udb_query_get_name (q),
+          cdbi_strerror (db->connection, errbuf, sizeof (errbuf)));
+      BAIL_OUT (-1);
+    }
+
+    column_num = (size_t) db_status;
+    DEBUG ("cdbi_read_database_query (%s, %s): There are %zu columns.",
+        db->name, udb_query_get_name (q), column_num);
+  }
 
-  if (((size_t) ds->ds_num) != q->values_num)
+  /* Allocate `column_names' and `column_values'. {{{ */
+  column_names = (char **) calloc (column_num, sizeof (char *));
+  if (column_names == NULL)
   {
-    ERROR ("dbi plugin: cdbi_read_database_query: Query `%s': The type `%s' "
-        "requires exactly %i value%s, but the configuration specifies %zu.",
-        q->name, q->type,
-        ds->ds_num, (ds->ds_num == 1) ? "" : "s",
-        q->values_num);
+    ERROR ("dbi plugin: malloc failed.");
     BAIL_OUT (-1);
   }
 
-  /* Allocate `instances' and `values' {{{ */
-  instances = (char **) malloc (sizeof (*instances) * q->instances_num);
-  if (instances == NULL)
+  column_names[0] = (char *) calloc (column_num,
+      DATA_MAX_NAME_LEN * sizeof (char));
+  if (column_names[0] == NULL)
   {
     ERROR ("dbi plugin: malloc failed.");
     BAIL_OUT (-1);
   }
+  for (i = 1; i < column_num; i++)
+    column_names[i] = column_names[i - 1] + DATA_MAX_NAME_LEN;
 
-  instances[0] = (char *) malloc (q->instances_num * DATA_MAX_NAME_LEN);
-  if (instances[0] == NULL)
+  column_values = (char **) calloc (column_num, sizeof (char *));
+  if (column_values == NULL)
   {
     ERROR ("dbi plugin: malloc failed.");
     BAIL_OUT (-1);
   }
-  for (i = 1; i < q->instances_num; i++)
-    instances[i] = instances[i - 1] + DATA_MAX_NAME_LEN;
 
-  values = (value_t *) malloc (sizeof (*values) * q->values_num);
-  if (values == NULL)
+  column_values[0] = (char *) calloc (column_num,
+      DATA_MAX_NAME_LEN * sizeof (char));
+  if (column_values[0] == NULL)
   {
     ERROR ("dbi plugin: malloc failed.");
     BAIL_OUT (-1);
   }
+  for (i = 1; i < column_num; i++)
+    column_values[i] = column_values[i - 1] + DATA_MAX_NAME_LEN;
   /* }}} */
 
-  res = dbi_conn_query (db->connection, q->statement);
-  if (res == NULL)
+  /* Copy the field names to `column_names' */
+  for (i = 0; i < column_num; i++) /* {{{ */
   {
-    char errbuf[1024];
-    ERROR ("dbi plugin: cdbi_read_database_query (%s, %s): "
-        "dbi_conn_query failed: %s",
-        db->name, q->name,
-        cdbi_strerror (db->connection, errbuf, sizeof (errbuf)));
-    BAIL_OUT (-1);
-  }
+    const char *column_name;
 
-  status = dbi_result_first_row (res);
+    column_name = dbi_result_get_field_name (res, (unsigned int) (i + 1));
+    if (column_name == NULL)
+    {
+      ERROR ("dbi plugin: cdbi_read_database_query (%s, %s): "
+          "Cannot retrieve name of field %zu.",
+          db->name, udb_query_get_name (q), i + 1);
+      BAIL_OUT (-1);
+    }
+
+    sstrncpy (column_names[i], column_name, DATA_MAX_NAME_LEN);
+  } /* }}} for (i = 0; i < column_num; i++) */
+
+  udb_query_prepare_result (q, prep_area, (db->host ? db->host : hostname_g),
+      /* plugin = */ "dbi", db->name,
+      column_names, column_num, /* interval = */ 0);
+
+  /* 0 = error; 1 = success; */
+  status = dbi_result_first_row (res); /* {{{ */
   if (status != 1)
   {
     char errbuf[1024];
     ERROR ("dbi plugin: cdbi_read_database_query (%s, %s): "
         "dbi_result_first_row failed: %s. Maybe the statement didn't "
         "return any rows?",
-        db->name, q->name, 
+        db->name, udb_query_get_name (q),
         cdbi_strerror (db->connection, errbuf, sizeof (errbuf)));
+    udb_query_finish_result (q, prep_area);
     BAIL_OUT (-1);
-  }
+  } /* }}} */
 
-  while (42)
+  /* Iterate over all rows and call `udb_query_handle_result' with each list of
+   * values. */
+  while (42) /* {{{ */
   {
-    /* Get instance names and values from the result: */
-    for (i = 0; i < q->instances_num; i++) /* {{{ */
+    status = 0;
+    /* Copy the value of the columns to `column_values' */
+    for (i = 0; i < column_num; i++) /* {{{ */
     {
-      const char *inst;
+      status = cdbi_result_get_field (res, (unsigned int) (i + 1),
+          column_values[i], DATA_MAX_NAME_LEN);
 
-      inst = dbi_result_get_string (res, q->instances[i]);
-      if (dbi_conn_error_flag (db->connection) != 0)
+      if (status != 0)
       {
-        char errbuf[1024];
         ERROR ("dbi plugin: cdbi_read_database_query (%s, %s): "
-            "dbi_result_get_string (%s) failed: %s",
-            db->name, q->name, q->instances[i],
-            cdbi_strerror (db->connection, errbuf, sizeof (errbuf)));
-        BAIL_OUT (-1);
+            "cdbi_result_get_field (%zu) failed.",
+            db->name, udb_query_get_name (q), i + 1);
+        status = -1;
+        break;
       }
+    } /* }}} for (i = 0; i < column_num; i++) */
 
-      sstrncpy (instances[i], (inst == NULL) ? "" : inst, DATA_MAX_NAME_LEN);
-      DEBUG ("dbi plugin: cdbi_read_database_query (%s, %s): "
-          "instances[%zu] = %s;",
-          db->name, q->name, i, instances[i]);
-    } /* }}} for (i = 0; i < q->instances_num; i++) */
-
-    for (i = 0; i < q->values_num; i++) /* {{{ */
+    /* If all values were copied successfully, call `udb_query_handle_result'
+     * to dispatch the row to the daemon. */
+    if (status == 0) /* {{{ */
     {
-      status = cdbi_result_get_field (res, q->values[i], ds->ds[i].type,
-          values + i);
+      status = udb_query_handle_result (q, prep_area, column_values);
       if (status != 0)
       {
-        BAIL_OUT (-1);
-      }
-
-      if (ds->ds[i].type == DS_TYPE_COUNTER)
-      {
-        DEBUG ("dbi plugin: cdbi_read_database_query (%s, %s): values[%zu] = %llu;",
-            db->name, q->name, i, values[i].counter);
-      }
-      else
-      {
-        DEBUG ("dbi plugin: cdbi_read_database_query (%s, %s): values[%zu] = %g;",
-            db->name, q->name, i, values[i].gauge);
+        ERROR ("dbi plugin: cdbi_read_database_query (%s, %s): "
+            "udb_query_handle_result failed.",
+            db->name, udb_query_get_name (q));
       }
-    } /* }}} for (i = 0; i < q->values_num; i++) */
-
-    /* Dispatch this row to the daemon. */
-    cdbi_submit (db, q, instances, values);
+    } /* }}} */
 
     /* Get the next row from the database. */
-    status = dbi_result_next_row (res);
+    status = dbi_result_next_row (res); /* {{{ */
     if (status != 1)
     {
-      if (dbi_conn_error_flag (db->connection) != 0)
+      if (dbi_conn_error (db->connection, NULL) != 0)
       {
         char errbuf[1024];
         WARNING ("dbi plugin: cdbi_read_database_query (%s, %s): "
             "dbi_result_next_row failed: %s.",
-            db->name, q->name,
+            db->name, udb_query_get_name (q),
             cdbi_strerror (db->connection, errbuf, sizeof (errbuf)));
       }
       break;
-    }
-  } /* while (42) */
+    } /* }}} */
+  } /* }}} while (42) */
 
+  /* Tell the db query interface that we're done with this query. */
+  udb_query_finish_result (q, prep_area);
+
+  /* Clean up and return `status = 0' (success) */
   BAIL_OUT (0);
 #undef BAIL_OUT
 } /* }}} int cdbi_read_database_query */
@@ -908,24 +703,38 @@ static int cdbi_connect_database (cdbi_database_t *db) /* {{{ */
    * trouble finding out how to configure the plugin correctly.. */
   for (i = 0; i < db->driver_options_num; i++)
   {
-    DEBUG ("dbi plugin: cdbi_connect_database (%s): "
-        "key = %s; value = %s;",
-        db->name,
-        db->driver_options[i].key,
-        db->driver_options[i].value);
-
-    status = dbi_conn_set_option (connection,
-        db->driver_options[i].key, db->driver_options[i].value);
-    if (status != 0)
+    if (db->driver_options[i].is_numeric)
     {
-      char errbuf[1024];
-      const char *opt;
+      status = dbi_conn_set_option_numeric (connection,
+          db->driver_options[i].key, db->driver_options[i].value.numeric);
+      if (status != 0)
+      {
+        char errbuf[1024];
+        ERROR ("dbi plugin: cdbi_connect_database (%s): "
+            "dbi_conn_set_option_numeric (\"%s\", %i) failed: %s.",
+            db->name,
+            db->driver_options[i].key, db->driver_options[i].value.numeric,
+            cdbi_strerror (connection, errbuf, sizeof (errbuf)));
+      }
+    }
+    else
+    {
+      status = dbi_conn_set_option (connection,
+          db->driver_options[i].key, db->driver_options[i].value.string);
+      if (status != 0)
+      {
+        char errbuf[1024];
+        ERROR ("dbi plugin: cdbi_connect_database (%s): "
+            "dbi_conn_set_option (\"%s\", \"%s\") failed: %s.",
+            db->name,
+            db->driver_options[i].key, db->driver_options[i].value.string,
+            cdbi_strerror (connection, errbuf, sizeof (errbuf)));
+      }
+    }
 
-      ERROR ("dbi plugin: cdbi_connect_database (%s): "
-          "dbi_conn_set_option (%s, %s) failed: %s.",
-          db->name,
-          db->driver_options[i].key, db->driver_options[i].value,
-          cdbi_strerror (connection, errbuf, sizeof (errbuf)));
+    if (status != 0)
+    {
+      char const *opt;
 
       INFO ("dbi plugin: This is a list of all options understood "
           "by the `%s' driver:", db->driver);
@@ -971,21 +780,34 @@ static int cdbi_connect_database (cdbi_database_t *db) /* {{{ */
   return (0);
 } /* }}} int cdbi_connect_database */
 
-static int cdbi_read_database (cdbi_database_t *db) /* {{{ */
+static int cdbi_read_database (user_data_t *ud) /* {{{ */
 {
+  cdbi_database_t *db = (cdbi_database_t *) ud->data;
   size_t i;
   int success;
   int status;
 
+  unsigned int db_version;
+
   status = cdbi_connect_database (db);
   if (status != 0)
     return (status);
   assert (db->connection != NULL);
 
+  db_version = dbi_conn_get_engine_version (db->connection);
+  /* TODO: Complain if `db_version == 0' */
+
   success = 0;
   for (i = 0; i < db->queries_num; i++)
   {
-    status = cdbi_read_database_query (db, db->queries[i]);
+    /* Check if we know the database's version and if so, if this query applies
+     * to that version. */
+    if ((db_version != 0)
+        && (udb_query_check_version (db->queries[i], db_version) == 0))
+      continue;
+
+    status = cdbi_read_database_query (db,
+        db->queries[i], db->q_prep_areas[i]);
     if (status == 0)
       success++;
   }
@@ -999,29 +821,6 @@ static int cdbi_read_database (cdbi_database_t *db) /* {{{ */
   return (0);
 } /* }}} int cdbi_read_database */
 
-static int cdbi_read (void) /* {{{ */
-{
-  size_t i;
-  int success = 0;
-  int status;
-
-  for (i = 0; i < databases_num; i++)
-  {
-    status = cdbi_read_database (databases[i]);
-    if (status == 0)
-      success++;
-  }
-
-  if (success == 0)
-  {
-    ERROR ("dbi plugin: No database could be read. Will return an error so "
-        "the plugin will be delayed.");
-    return (-1);
-  }
-
-  return (0);
-} /* }}} int cdbi_read */
-
 static int cdbi_shutdown (void) /* {{{ */
 {
   size_t i;
@@ -1038,9 +837,8 @@ static int cdbi_shutdown (void) /* {{{ */
   sfree (databases);
   databases_num = 0;
 
-  for (i = 0; i < queries_num; i++)
-    cdbi_query_free (queries[i]);
-  sfree (queries);
+  udb_query_free (queries, queries_num);
+  queries = NULL;
   queries_num = 0;
 
   return (0);
@@ -1050,7 +848,6 @@ void module_register (void) /* {{{ */
 {
   plugin_register_complex_config ("dbi", cdbi_config);
   plugin_register_init ("dbi", cdbi_init);
-  plugin_register_read ("dbi", cdbi_read);
   plugin_register_shutdown ("dbi", cdbi_shutdown);
 } /* }}} void module_register */