postgresql plugin: Added support for <Result> blocks.
[collectd.git] / src / postgresql.c
index ccd357c..bb2f0d2 100644 (file)
@@ -83,10 +83,17 @@ typedef enum {
 } c_psql_param_t;
 
 typedef struct {
-       char *type;
-       char *type_instance;
-       int   ds_type;
-} c_psql_col_t;
+       char  *type;
+       char  *instance_prefix;
+       char **instances_str;
+       int   *instances;
+       int    instances_num;
+       char **values_str; /* may be NULL, even if values_num != 0 in
+                             case the "Column" option has been used */
+       int   *values;
+       int   *ds_types;
+       int    values_num;
+} c_psql_result_t;
 
 typedef struct {
        char *name;
@@ -95,8 +102,8 @@ typedef struct {
        c_psql_param_t *params;
        int             params_num;
 
-       c_psql_col_t *cols;
-       int           cols_num;
+       c_psql_result_t *results;
+       int              results_num;
 
        int min_pg_version;
        int max_pg_version;
@@ -145,6 +152,54 @@ static int             queries_num      = 0;
 static c_psql_database_t *databases     = NULL;
 static int                databases_num = 0;
 
+static c_psql_result_t *c_psql_result_new (c_psql_query_t *query)
+{
+       c_psql_result_t *res;
+
+       ++query->results_num;
+       if (NULL == (query->results = (c_psql_result_t *)realloc (query->results,
+                                       query->results_num * sizeof (*query->results)))) {
+               log_err ("Out of memory.");
+               exit (5);
+       }
+       res = query->results + query->results_num - 1;
+
+       res->type    = NULL;
+
+       res->instance_prefix = NULL;
+       res->instances_str   = NULL;
+       res->instances       = NULL;
+       res->instances_num   = 0;
+
+       res->values_str = NULL;
+       res->values     = NULL;
+       res->ds_types   = NULL;
+       res->values_num = 0;
+       return res;
+} /* c_psql_result_new */
+
+static void c_psql_result_delete (c_psql_result_t *res)
+{
+       int i;
+
+       sfree (res->type);
+
+       sfree (res->instance_prefix);
+
+       for (i = 0; i < res->instances_num; ++i)
+               sfree (res->instances_str[i]);
+       sfree (res->instances_str);
+       sfree (res->instances);
+       res->instances_num = 0;
+
+       for (i = 0; (NULL != res->values_str) && (i < res->values_num); ++i)
+               sfree (res->values_str[i]);
+       sfree (res->values_str);
+       sfree (res->values);
+       sfree (res->ds_types);
+       res->values_num = 0;
+} /* c_psql_result_delete */
+
 static c_psql_query_t *c_psql_query_new (const char *name)
 {
        c_psql_query_t *query;
@@ -163,14 +218,46 @@ static c_psql_query_t *c_psql_query_new (const char *name)
        query->params     = NULL;
        query->params_num = 0;
 
-       query->cols     = NULL;
-       query->cols_num = 0;
+       query->results     = NULL;
+       query->results_num = 0;
 
        query->min_pg_version = 0;
        query->max_pg_version = INT_MAX;
        return query;
 } /* c_psql_query_new */
 
+static int c_psql_query_init (c_psql_query_t *query)
+{
+       int i;
+
+       /* Get the data set definitions for each query definition. */
+       for (i = 0; i < query->results_num; ++i) {
+               c_psql_result_t  *res = query->results + i;
+               const data_set_t *ds;
+
+               int j;
+
+               ds = plugin_get_ds (res->type);
+               if (NULL == ds) {
+                       log_err ("Result: Unknown type \"%s\".", res->type);
+                       return -1;
+               }
+
+               if (res->values_num != ds->ds_num) {
+                       log_err ("Result: Invalid type \"%s\" - "
+                                       "expected %i data source%s, got %i.",
+                                       res->type, res->values_num,
+                                       (1 == res->values_num) ? "" : "s",
+                                       ds->ds_num);
+                       return -1;
+               }
+
+               for (j = 0; j < res->values_num; ++j)
+                       res->ds_types[j] = ds->ds[j].type;
+       }
+       return 0;
+} /* c_psql_query_init */
+
 static void c_psql_query_delete (c_psql_query_t *query)
 {
        int i;
@@ -181,12 +268,10 @@ static void c_psql_query_delete (c_psql_query_t *query)
        sfree (query->params);
        query->params_num = 0;
 
-       for (i = 0; i < query->cols_num; ++i) {
-               sfree (query->cols[i].type);
-               sfree (query->cols[i].type_instance);
-       }
-       sfree (query->cols);
-       query->cols_num = 0;
+       for (i = 0; i < query->results_num; ++i)
+               c_psql_result_delete (query->results + i);
+       sfree (query->results);
+       query->results_num = 0;
        return;
 } /* c_psql_query_delete */
 
@@ -294,14 +379,18 @@ static void c_psql_database_delete (c_psql_database_t *db)
        return;
 } /* c_psql_database_delete */
 
-static void submit (const c_psql_database_t *db,
-               const char *type, const char *type_instance,
-               value_t *values, size_t values_len)
+static void submit (const c_psql_database_t *db, const c_psql_result_t *res,
+               char **instances, value_t *values)
 {
        value_list_t vl = VALUE_LIST_INIT;
 
+       int instances_num = res->instances_num;
+
+       if (NULL != res->instance_prefix)
+               ++instances_num;
+
        vl.values     = values;
-       vl.values_len = values_len;
+       vl.values_len = res->values_num;
        vl.time       = time (NULL);
 
        if (C_PSQL_IS_UNIX_DOMAIN_SOCKET (db->host)
@@ -313,42 +402,114 @@ static void submit (const c_psql_database_t *db,
        sstrncpy (vl.plugin, "postgresql", sizeof (vl.plugin));
        sstrncpy (vl.plugin_instance, db->database, sizeof (vl.plugin_instance));
 
-       sstrncpy (vl.type, type, sizeof (vl.type));
+       sstrncpy (vl.type, res->type, sizeof (vl.type));
 
-       if (NULL != type_instance)
-               sstrncpy (vl.type_instance, type_instance, sizeof (vl.type_instance));
+       if (0 < instances_num) {
+               vl.type_instance[sizeof (vl.type_instance) - 1] = '\0';
+               strjoin (vl.type_instance, sizeof (vl.type_instance),
+                               instances, instances_num, "-");
+
+               if ('\0' != vl.type_instance[sizeof (vl.type_instance) - 1]) {
+                       vl.type_instance[sizeof (vl.type_instance) - 1] = '\0';
+                       log_warn ("Truncated type instance: %s.", vl.type_instance);
+               }
+       }
 
        plugin_dispatch_values (&vl);
        return;
 } /* submit */
 
-static void submit_counter (const c_psql_database_t *db,
-               const char *type, const char *type_instance,
-               const char *value)
+static int c_psql_get_colnum (PGresult *pgres,
+               char **strings, int *numbers, int idx)
 {
-       value_t values[1];
+       int colnum;
 
-       if ((NULL == value) || ('\0' == *value))
-               return;
+       if (0 <= numbers[idx])
+               return numbers[idx];
 
-       values[0].counter = atoll (value);
-       submit (db, type, type_instance, values, 1);
-       return;
-} /* submit_counter */
+       colnum = PQfnumber (pgres, strings[idx]);
+       if (0 > colnum)
+               log_err ("No such column: %s.", strings[idx]);
+
+       numbers[idx] = colnum;
+       return colnum;
+} /* c_psql_get_colnum */
 
-static void submit_gauge (const c_psql_database_t *db,
-               const char *type, const char *type_instance,
-               const char *value)
+static void c_psql_dispatch_row (c_psql_database_t *db, c_psql_query_t *query,
+               PGresult *pgres, int row)
 {
-       value_t values[1];
+       int i;
 
-       if ((NULL == value) || ('\0' == *value))
-               return;
+       for (i = 0; i < query->results_num; ++i) {
+               c_psql_result_t *res = query->results + i;
 
-       values[0].gauge = atof (value);
-       submit (db, type, type_instance, values, 1);
-       return;
-} /* submit_gauge */
+               char   *instances[res->instances_num + 1];
+               value_t values[res->values_num];
+
+               int offset = 0, status = 0, j;
+
+               /* get the instance name */
+               if (NULL != res->instance_prefix) {
+                       instances[0] = res->instance_prefix;
+                       offset = 1;
+               }
+
+               for (j = 0; (0 == status) && (j < res->instances_num); ++j) {
+                       int col = c_psql_get_colnum (pgres,
+                                       res->instances_str, res->instances, j);
+
+                       if (0 > col) {
+                               status = -1;
+                               break;
+                       }
+
+                       instances[j + offset] = PQgetvalue (pgres, row, col);
+                       if (NULL == instances[j + offset])
+                               instances[j + offset] = "";
+               }
+
+               /* get the values */
+               for (j = 0; (0 == status) && (j < res->values_num); ++j) {
+                       int col = c_psql_get_colnum (pgres,
+                                       res->values_str, res->values, j);
+
+                       char *value_str;
+                       char *endptr = NULL;
+
+                       if (0 > col) {
+                               status = -1;
+                               break;
+                       }
+
+                       value_str = PQgetvalue (pgres, row, col);
+                       if ((NULL == value_str) || ('\0' == *value_str))
+                               value_str = "0";
+
+                       if (res->ds_types[j] == DS_TYPE_COUNTER)
+                               values[j].counter = (counter_t)strtoll (value_str, &endptr, 0);
+                       else if (res->ds_types[j] == DS_TYPE_GAUGE)
+                               values[j].gauge = (gauge_t)strtod (value_str, &endptr);
+                       else {
+                               log_err ("Invalid type \"%s\" (%i).",
+                                               res->type, res->ds_types[j]);
+                       }
+
+                       if (value_str == endptr) {
+                               log_err ("Failed to parse string as number: %s.", value_str);
+                               status = -1;
+                               break;
+                       }
+                       else if ((NULL != endptr) && ('\0' != *endptr))
+                               log_warn ("Ignoring trailing garbage after number: %s.",
+                                               endptr);
+               }
+
+               if (0 != status)
+                       continue;
+
+               submit (db, res, instances, values);
+       }
+} /* c_psql_dispatch_row */
 
 static int c_psql_check_connection (c_psql_database_t *db)
 {
@@ -463,28 +624,9 @@ static int c_psql_exec_query (c_psql_database_t *db, int idx)
        }
 
        cols = PQnfields (res);
-       if (query->cols_num != cols) {
-               log_err ("SQL query returned wrong number of fields "
-                               "(expected: %i, got: %i)", query->cols_num, cols);
-               log_info ("SQL query was: %s", query->stmt);
-               PQclear (res);
-               return -1;
-       }
-
-       for (i = 0; i < rows; ++i) {
-               int j;
 
-               for (j = 0; j < cols; ++j) {
-                       c_psql_col_t col = query->cols[j];
-
-                       char *value = PQgetvalue (res, i, j);
-
-                       if (col.ds_type == DS_TYPE_COUNTER)
-                               submit_counter (db, col.type, col.type_instance, value);
-                       else if (col.ds_type == DS_TYPE_GAUGE)
-                               submit_gauge (db, col.type, col.type_instance, value);
-               }
-       }
+       for (i = 0; i < rows; ++i)
+               c_psql_dispatch_row (db, query, res, i);
        PQclear (res);
        return 0;
 } /* c_psql_exec_query */
@@ -550,32 +692,11 @@ static int c_psql_init (void)
        if ((NULL == databases) || (0 == databases_num))
                return 0;
 
-       for (i = 0; i < queries_num; ++i) {
-               c_psql_query_t *query = queries + i;
-               int j;
-
-               for (j = 0; j < query->cols_num; ++j) {
-                       c_psql_col_t     *col = query->cols + j;
-                       const data_set_t *ds;
-
-                       ds = plugin_get_ds (col->type);
-                       if (NULL == ds) {
-                               log_err ("Column: Unknown type \"%s\".", col->type);
-                               c_psql_shutdown ();
-                               return -1;
-                       }
-
-                       if (1 != ds->ds_num) {
-                               log_err ("Column: Invalid type \"%s\" - types defining "
-                                               "one data source are supported only (got: %i).",
-                                               col->type, ds->ds_num);
-                               c_psql_shutdown ();
-                               return -1;
-                       }
-
-                       col->ds_type = ds->ds[0].type;
+       for (i = 0; i < queries_num; ++i)
+               if (0 != c_psql_query_init (queries + i)) {
+                       c_psql_shutdown ();
+                       return -1;
                }
-       }
 
        for (i = 0; i < databases_num; ++i) {
                c_psql_database_t *db = databases + i;
@@ -661,6 +782,34 @@ static int config_set_i (char *name, int *var, const oconfig_item_t *ci)
        return 0;
 } /* config_set_i */
 
+static int config_append_array_s (char *name, char ***var, int *len,
+               const oconfig_item_t *ci)
+{
+       int i;
+
+       if ((0 != ci->children_num) || (1 > ci->values_num)) {
+               log_err ("%s expects at least one argument.", name);
+               return 1;
+       }
+
+       for (i = 0; i < ci->values_num; ++i) {
+               if (OCONFIG_TYPE_STRING != ci->values[i].type) {
+                       log_err ("%s expects string arguments.", name);
+                       return 1;
+               }
+       }
+
+       *len += ci->values_num;
+       if (NULL == (*var = (char **)realloc (*var, *len * sizeof (**var)))) {
+               log_err ("Out of memory.");
+               exit (5);
+       }
+
+       for (i = *len - ci->values_num; i < *len; ++i)
+               (*var)[i] = sstrdup (ci->values[i].value.string);
+       return 0;
+} /* config_append_array_s */
+
 static int config_set_param (c_psql_query_t *query, const oconfig_item_t *ci)
 {
        c_psql_param_t param;
@@ -695,9 +844,75 @@ static int config_set_param (c_psql_query_t *query, const oconfig_item_t *ci)
        return 0;
 } /* config_set_param */
 
-static int config_set_column (c_psql_query_t *query, const oconfig_item_t *ci)
+static int config_set_result (c_psql_query_t *query, const oconfig_item_t *ci)
 {
-       c_psql_col_t *col;
+       c_psql_result_t *res;
+
+       int status = 0, i;
+
+       if (0 != ci->values_num) {
+               log_err ("<Result> does not expect any arguments.");
+               return 1;
+       }
+
+       res = c_psql_result_new (query);
+
+       for (i = 0; i < ci->children_num; ++i) {
+               oconfig_item_t *c = ci->children + i;
+
+               if (0 == strcasecmp (c->key, "Type"))
+                       config_set_s ("Type", &res->type, c);
+               else if (0 == strcasecmp (c->key, "InstancePrefix"))
+                       config_set_s ("InstancePrefix", &res->instance_prefix, c);
+               else if (0 == strcasecmp (c->key, "InstancesFrom"))
+                       config_append_array_s ("InstancesFrom",
+                                       &res->instances_str, &res->instances_num, c);
+               else if (0 == strcasecmp (c->key, "ValuesFrom"))
+                       config_append_array_s ("ValuesFrom",
+                                       &res->values_str, &res->values_num, c);
+               else
+                       log_warn ("Ignoring unknown config key \"%s\".", c->key);
+       }
+
+       if (NULL == res->type) {
+               log_warn ("Query \"%s\": Missing Type option in <Result> block.",
+                               query->name);
+               status = 1;
+       }
+
+       if (NULL == res->values_str) {
+               log_warn ("Query \"%s\": Missing ValuesFrom option in <Result> block.",
+                               query->name);
+               status = 1;
+       }
+
+       if (0 != status) {
+               c_psql_result_delete (res);
+               --query->results_num;
+               return status;
+       }
+
+       /* preallocate memory to cache the column numbers and data types */
+       res->values = (int *)smalloc (res->values_num * sizeof (*res->values));
+       for (i = 0; i < res->values_num; ++i)
+               res->values[i] = -1;
+
+       res->instances = (int *)smalloc (res->instances_num
+                       * sizeof (*res->instances));
+       for (i = 0; i < res->instances_num; ++i)
+               res->instances[i] = -1;
+
+       res->ds_types = (int *)smalloc (res->values_num
+                       * sizeof (*res->ds_types));
+       for (i = 0; i < res->values_num; ++i)
+               res->ds_types[i] = -1;
+       return 0;
+} /* config_set_result */
+
+static int config_set_column (c_psql_query_t *query, int col_num,
+               const oconfig_item_t *ci)
+{
+       c_psql_result_t *res;
 
        int i;
 
@@ -714,20 +929,17 @@ static int config_set_column (c_psql_query_t *query, const oconfig_item_t *ci)
                }
        }
 
-       ++query->cols_num;
-       if (NULL == (query->cols = (c_psql_col_t *)realloc (query->cols,
-                               query->cols_num * sizeof (*query->cols)))) {
-               log_err ("Out of memory.");
-               exit (5);
-       }
+       res = c_psql_result_new (query);
 
-       col = query->cols + query->cols_num - 1;
+       res->type = sstrdup (ci->values[0].value.string);
 
-       col->ds_type = -1;
+       if (2 == ci->values_num)
+               res->instance_prefix = sstrdup (ci->values[1].value.string);
 
-       col->type = sstrdup (ci->values[0].value.string);
-       col->type_instance = (2 == ci->values_num)
-               ? sstrdup (ci->values[1].value.string) : NULL;
+       res->values     = (int *)smalloc (sizeof (*res->values));
+       res->values[0]  = col_num;
+       res->ds_types   = (int *)smalloc (sizeof (*res->ds_types));
+       res->values_num = 1;
        return 0;
 } /* config_set_column */
 
@@ -770,7 +982,7 @@ static int c_psql_config_query (oconfig_item_t *ci)
 {
        c_psql_query_t *query;
 
-       int status = 0, i;
+       int status = 0, col_num = 0, i;
 
        if ((1 != ci->values_num)
                        || (OCONFIG_TYPE_STRING != ci->values[0].type)) {
@@ -792,8 +1004,15 @@ static int c_psql_config_query (oconfig_item_t *ci)
                }
                else if (0 == strcasecmp (c->key, "Param"))
                        config_set_param (query, c);
-               else if (0 == strcasecmp (c->key, "Column"))
-                       config_set_column (query, c);
+               else if (0 == strcasecmp (c->key, "Result"))
+                       config_set_result (query, c);
+               /* backwards compat for versions < 4.6 */
+               else if (0 == strcasecmp (c->key, "Column")) {
+                       log_warn ("<Query>: 'Column' is deprecated - "
+                                       "use a <Result> block instead.");
+                       config_set_column (query, col_num, c);
+                       ++col_num;
+               }
                else if (0 == strcasecmp (c->key, "MinPGVersion"))
                        config_set_i ("MinPGVersion", &query->min_pg_version, c);
                else if (0 == strcasecmp (c->key, "MaxPGVersion"))