X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fpostgresql.c;h=bb2f0d28e99c1c1ebab8540e006e64b81c181e7d;hb=0b350ce8049e8c5db34751340e015c7a7937664e;hp=ccd357c59fbb53cfc051b1c247f460c34299aa11;hpb=fb86aa60842b2b8ae3f00af34e685dcbf9ad2592;p=collectd.git diff --git a/src/postgresql.c b/src/postgresql.c index ccd357c5..bb2f0d28 100644 --- a/src/postgresql.c +++ b/src/postgresql.c @@ -83,10 +83,17 @@ typedef enum { } c_psql_param_t; typedef struct { - char *type; - char *type_instance; - int ds_type; -} c_psql_col_t; + char *type; + char *instance_prefix; + char **instances_str; + int *instances; + int instances_num; + char **values_str; /* may be NULL, even if values_num != 0 in + case the "Column" option has been used */ + int *values; + int *ds_types; + int values_num; +} c_psql_result_t; typedef struct { char *name; @@ -95,8 +102,8 @@ typedef struct { c_psql_param_t *params; int params_num; - c_psql_col_t *cols; - int cols_num; + c_psql_result_t *results; + int results_num; int min_pg_version; int max_pg_version; @@ -145,6 +152,54 @@ static int queries_num = 0; static c_psql_database_t *databases = NULL; static int databases_num = 0; +static c_psql_result_t *c_psql_result_new (c_psql_query_t *query) +{ + c_psql_result_t *res; + + ++query->results_num; + if (NULL == (query->results = (c_psql_result_t *)realloc (query->results, + query->results_num * sizeof (*query->results)))) { + log_err ("Out of memory."); + exit (5); + } + res = query->results + query->results_num - 1; + + res->type = NULL; + + res->instance_prefix = NULL; + res->instances_str = NULL; + res->instances = NULL; + res->instances_num = 0; + + res->values_str = NULL; + res->values = NULL; + res->ds_types = NULL; + res->values_num = 0; + return res; +} /* c_psql_result_new */ + +static void c_psql_result_delete (c_psql_result_t *res) +{ + int i; + + sfree (res->type); + + sfree (res->instance_prefix); + + for (i = 0; i < res->instances_num; ++i) + sfree (res->instances_str[i]); + sfree (res->instances_str); + sfree (res->instances); + res->instances_num = 0; + + for (i = 0; (NULL != res->values_str) && (i < res->values_num); ++i) + sfree (res->values_str[i]); + sfree (res->values_str); + sfree (res->values); + sfree (res->ds_types); + res->values_num = 0; +} /* c_psql_result_delete */ + static c_psql_query_t *c_psql_query_new (const char *name) { c_psql_query_t *query; @@ -163,14 +218,46 @@ static c_psql_query_t *c_psql_query_new (const char *name) query->params = NULL; query->params_num = 0; - query->cols = NULL; - query->cols_num = 0; + query->results = NULL; + query->results_num = 0; query->min_pg_version = 0; query->max_pg_version = INT_MAX; return query; } /* c_psql_query_new */ +static int c_psql_query_init (c_psql_query_t *query) +{ + int i; + + /* Get the data set definitions for each query definition. */ + for (i = 0; i < query->results_num; ++i) { + c_psql_result_t *res = query->results + i; + const data_set_t *ds; + + int j; + + ds = plugin_get_ds (res->type); + if (NULL == ds) { + log_err ("Result: Unknown type \"%s\".", res->type); + return -1; + } + + if (res->values_num != ds->ds_num) { + log_err ("Result: Invalid type \"%s\" - " + "expected %i data source%s, got %i.", + res->type, res->values_num, + (1 == res->values_num) ? "" : "s", + ds->ds_num); + return -1; + } + + for (j = 0; j < res->values_num; ++j) + res->ds_types[j] = ds->ds[j].type; + } + return 0; +} /* c_psql_query_init */ + static void c_psql_query_delete (c_psql_query_t *query) { int i; @@ -181,12 +268,10 @@ static void c_psql_query_delete (c_psql_query_t *query) sfree (query->params); query->params_num = 0; - for (i = 0; i < query->cols_num; ++i) { - sfree (query->cols[i].type); - sfree (query->cols[i].type_instance); - } - sfree (query->cols); - query->cols_num = 0; + for (i = 0; i < query->results_num; ++i) + c_psql_result_delete (query->results + i); + sfree (query->results); + query->results_num = 0; return; } /* c_psql_query_delete */ @@ -294,14 +379,18 @@ static void c_psql_database_delete (c_psql_database_t *db) return; } /* c_psql_database_delete */ -static void submit (const c_psql_database_t *db, - const char *type, const char *type_instance, - value_t *values, size_t values_len) +static void submit (const c_psql_database_t *db, const c_psql_result_t *res, + char **instances, value_t *values) { value_list_t vl = VALUE_LIST_INIT; + int instances_num = res->instances_num; + + if (NULL != res->instance_prefix) + ++instances_num; + vl.values = values; - vl.values_len = values_len; + vl.values_len = res->values_num; vl.time = time (NULL); if (C_PSQL_IS_UNIX_DOMAIN_SOCKET (db->host) @@ -313,42 +402,114 @@ static void submit (const c_psql_database_t *db, sstrncpy (vl.plugin, "postgresql", sizeof (vl.plugin)); sstrncpy (vl.plugin_instance, db->database, sizeof (vl.plugin_instance)); - sstrncpy (vl.type, type, sizeof (vl.type)); + sstrncpy (vl.type, res->type, sizeof (vl.type)); - if (NULL != type_instance) - sstrncpy (vl.type_instance, type_instance, sizeof (vl.type_instance)); + if (0 < instances_num) { + vl.type_instance[sizeof (vl.type_instance) - 1] = '\0'; + strjoin (vl.type_instance, sizeof (vl.type_instance), + instances, instances_num, "-"); + + if ('\0' != vl.type_instance[sizeof (vl.type_instance) - 1]) { + vl.type_instance[sizeof (vl.type_instance) - 1] = '\0'; + log_warn ("Truncated type instance: %s.", vl.type_instance); + } + } plugin_dispatch_values (&vl); return; } /* submit */ -static void submit_counter (const c_psql_database_t *db, - const char *type, const char *type_instance, - const char *value) +static int c_psql_get_colnum (PGresult *pgres, + char **strings, int *numbers, int idx) { - value_t values[1]; + int colnum; - if ((NULL == value) || ('\0' == *value)) - return; + if (0 <= numbers[idx]) + return numbers[idx]; - values[0].counter = atoll (value); - submit (db, type, type_instance, values, 1); - return; -} /* submit_counter */ + colnum = PQfnumber (pgres, strings[idx]); + if (0 > colnum) + log_err ("No such column: %s.", strings[idx]); + + numbers[idx] = colnum; + return colnum; +} /* c_psql_get_colnum */ -static void submit_gauge (const c_psql_database_t *db, - const char *type, const char *type_instance, - const char *value) +static void c_psql_dispatch_row (c_psql_database_t *db, c_psql_query_t *query, + PGresult *pgres, int row) { - value_t values[1]; + int i; - if ((NULL == value) || ('\0' == *value)) - return; + for (i = 0; i < query->results_num; ++i) { + c_psql_result_t *res = query->results + i; - values[0].gauge = atof (value); - submit (db, type, type_instance, values, 1); - return; -} /* submit_gauge */ + char *instances[res->instances_num + 1]; + value_t values[res->values_num]; + + int offset = 0, status = 0, j; + + /* get the instance name */ + if (NULL != res->instance_prefix) { + instances[0] = res->instance_prefix; + offset = 1; + } + + for (j = 0; (0 == status) && (j < res->instances_num); ++j) { + int col = c_psql_get_colnum (pgres, + res->instances_str, res->instances, j); + + if (0 > col) { + status = -1; + break; + } + + instances[j + offset] = PQgetvalue (pgres, row, col); + if (NULL == instances[j + offset]) + instances[j + offset] = ""; + } + + /* get the values */ + for (j = 0; (0 == status) && (j < res->values_num); ++j) { + int col = c_psql_get_colnum (pgres, + res->values_str, res->values, j); + + char *value_str; + char *endptr = NULL; + + if (0 > col) { + status = -1; + break; + } + + value_str = PQgetvalue (pgres, row, col); + if ((NULL == value_str) || ('\0' == *value_str)) + value_str = "0"; + + if (res->ds_types[j] == DS_TYPE_COUNTER) + values[j].counter = (counter_t)strtoll (value_str, &endptr, 0); + else if (res->ds_types[j] == DS_TYPE_GAUGE) + values[j].gauge = (gauge_t)strtod (value_str, &endptr); + else { + log_err ("Invalid type \"%s\" (%i).", + res->type, res->ds_types[j]); + } + + if (value_str == endptr) { + log_err ("Failed to parse string as number: %s.", value_str); + status = -1; + break; + } + else if ((NULL != endptr) && ('\0' != *endptr)) + log_warn ("Ignoring trailing garbage after number: %s.", + endptr); + } + + if (0 != status) + continue; + + submit (db, res, instances, values); + } +} /* c_psql_dispatch_row */ static int c_psql_check_connection (c_psql_database_t *db) { @@ -463,28 +624,9 @@ static int c_psql_exec_query (c_psql_database_t *db, int idx) } cols = PQnfields (res); - if (query->cols_num != cols) { - log_err ("SQL query returned wrong number of fields " - "(expected: %i, got: %i)", query->cols_num, cols); - log_info ("SQL query was: %s", query->stmt); - PQclear (res); - return -1; - } - - for (i = 0; i < rows; ++i) { - int j; - for (j = 0; j < cols; ++j) { - c_psql_col_t col = query->cols[j]; - - char *value = PQgetvalue (res, i, j); - - if (col.ds_type == DS_TYPE_COUNTER) - submit_counter (db, col.type, col.type_instance, value); - else if (col.ds_type == DS_TYPE_GAUGE) - submit_gauge (db, col.type, col.type_instance, value); - } - } + for (i = 0; i < rows; ++i) + c_psql_dispatch_row (db, query, res, i); PQclear (res); return 0; } /* c_psql_exec_query */ @@ -550,32 +692,11 @@ static int c_psql_init (void) if ((NULL == databases) || (0 == databases_num)) return 0; - for (i = 0; i < queries_num; ++i) { - c_psql_query_t *query = queries + i; - int j; - - for (j = 0; j < query->cols_num; ++j) { - c_psql_col_t *col = query->cols + j; - const data_set_t *ds; - - ds = plugin_get_ds (col->type); - if (NULL == ds) { - log_err ("Column: Unknown type \"%s\".", col->type); - c_psql_shutdown (); - return -1; - } - - if (1 != ds->ds_num) { - log_err ("Column: Invalid type \"%s\" - types defining " - "one data source are supported only (got: %i).", - col->type, ds->ds_num); - c_psql_shutdown (); - return -1; - } - - col->ds_type = ds->ds[0].type; + for (i = 0; i < queries_num; ++i) + if (0 != c_psql_query_init (queries + i)) { + c_psql_shutdown (); + return -1; } - } for (i = 0; i < databases_num; ++i) { c_psql_database_t *db = databases + i; @@ -661,6 +782,34 @@ static int config_set_i (char *name, int *var, const oconfig_item_t *ci) return 0; } /* config_set_i */ +static int config_append_array_s (char *name, char ***var, int *len, + const oconfig_item_t *ci) +{ + int i; + + if ((0 != ci->children_num) || (1 > ci->values_num)) { + log_err ("%s expects at least one argument.", name); + return 1; + } + + for (i = 0; i < ci->values_num; ++i) { + if (OCONFIG_TYPE_STRING != ci->values[i].type) { + log_err ("%s expects string arguments.", name); + return 1; + } + } + + *len += ci->values_num; + if (NULL == (*var = (char **)realloc (*var, *len * sizeof (**var)))) { + log_err ("Out of memory."); + exit (5); + } + + for (i = *len - ci->values_num; i < *len; ++i) + (*var)[i] = sstrdup (ci->values[i].value.string); + return 0; +} /* config_append_array_s */ + static int config_set_param (c_psql_query_t *query, const oconfig_item_t *ci) { c_psql_param_t param; @@ -695,9 +844,75 @@ static int config_set_param (c_psql_query_t *query, const oconfig_item_t *ci) return 0; } /* config_set_param */ -static int config_set_column (c_psql_query_t *query, const oconfig_item_t *ci) +static int config_set_result (c_psql_query_t *query, const oconfig_item_t *ci) { - c_psql_col_t *col; + c_psql_result_t *res; + + int status = 0, i; + + if (0 != ci->values_num) { + log_err (" does not expect any arguments."); + return 1; + } + + res = c_psql_result_new (query); + + for (i = 0; i < ci->children_num; ++i) { + oconfig_item_t *c = ci->children + i; + + if (0 == strcasecmp (c->key, "Type")) + config_set_s ("Type", &res->type, c); + else if (0 == strcasecmp (c->key, "InstancePrefix")) + config_set_s ("InstancePrefix", &res->instance_prefix, c); + else if (0 == strcasecmp (c->key, "InstancesFrom")) + config_append_array_s ("InstancesFrom", + &res->instances_str, &res->instances_num, c); + else if (0 == strcasecmp (c->key, "ValuesFrom")) + config_append_array_s ("ValuesFrom", + &res->values_str, &res->values_num, c); + else + log_warn ("Ignoring unknown config key \"%s\".", c->key); + } + + if (NULL == res->type) { + log_warn ("Query \"%s\": Missing Type option in block.", + query->name); + status = 1; + } + + if (NULL == res->values_str) { + log_warn ("Query \"%s\": Missing ValuesFrom option in block.", + query->name); + status = 1; + } + + if (0 != status) { + c_psql_result_delete (res); + --query->results_num; + return status; + } + + /* preallocate memory to cache the column numbers and data types */ + res->values = (int *)smalloc (res->values_num * sizeof (*res->values)); + for (i = 0; i < res->values_num; ++i) + res->values[i] = -1; + + res->instances = (int *)smalloc (res->instances_num + * sizeof (*res->instances)); + for (i = 0; i < res->instances_num; ++i) + res->instances[i] = -1; + + res->ds_types = (int *)smalloc (res->values_num + * sizeof (*res->ds_types)); + for (i = 0; i < res->values_num; ++i) + res->ds_types[i] = -1; + return 0; +} /* config_set_result */ + +static int config_set_column (c_psql_query_t *query, int col_num, + const oconfig_item_t *ci) +{ + c_psql_result_t *res; int i; @@ -714,20 +929,17 @@ static int config_set_column (c_psql_query_t *query, const oconfig_item_t *ci) } } - ++query->cols_num; - if (NULL == (query->cols = (c_psql_col_t *)realloc (query->cols, - query->cols_num * sizeof (*query->cols)))) { - log_err ("Out of memory."); - exit (5); - } + res = c_psql_result_new (query); - col = query->cols + query->cols_num - 1; + res->type = sstrdup (ci->values[0].value.string); - col->ds_type = -1; + if (2 == ci->values_num) + res->instance_prefix = sstrdup (ci->values[1].value.string); - col->type = sstrdup (ci->values[0].value.string); - col->type_instance = (2 == ci->values_num) - ? sstrdup (ci->values[1].value.string) : NULL; + res->values = (int *)smalloc (sizeof (*res->values)); + res->values[0] = col_num; + res->ds_types = (int *)smalloc (sizeof (*res->ds_types)); + res->values_num = 1; return 0; } /* config_set_column */ @@ -770,7 +982,7 @@ static int c_psql_config_query (oconfig_item_t *ci) { c_psql_query_t *query; - int status = 0, i; + int status = 0, col_num = 0, i; if ((1 != ci->values_num) || (OCONFIG_TYPE_STRING != ci->values[0].type)) { @@ -792,8 +1004,15 @@ static int c_psql_config_query (oconfig_item_t *ci) } else if (0 == strcasecmp (c->key, "Param")) config_set_param (query, c); - else if (0 == strcasecmp (c->key, "Column")) - config_set_column (query, c); + else if (0 == strcasecmp (c->key, "Result")) + config_set_result (query, c); + /* backwards compat for versions < 4.6 */ + else if (0 == strcasecmp (c->key, "Column")) { + log_warn (": 'Column' is deprecated - " + "use a block instead."); + config_set_column (query, col_num, c); + ++col_num; + } else if (0 == strcasecmp (c->key, "MinPGVersion")) config_set_i ("MinPGVersion", &query->min_pg_version, c); else if (0 == strcasecmp (c->key, "MaxPGVersion"))