X-Git-Url: https://git.octo.it/?p=collectd.git;a=blobdiff_plain;f=src%2Fpostgresql.c;h=635abfca61bf5647b4d53cea55601a5182a8997e;hp=a8812e2158327bf86153689894e8eada7e620022;hb=633c3966f770e4d46651a2fe219a18d8a9907a9f;hpb=59c7ee1cafaf53814838794908dd84f8101334c7 diff --git a/src/postgresql.c b/src/postgresql.c index a8812e21..635abfca 100644 --- a/src/postgresql.c +++ b/src/postgresql.c @@ -1,35 +1,29 @@ /** * collectd - src/postgresql.c - * Copyright (C) 2008, 2009 Sebastian Harl - * Copyright (C) 2009 Florian Forster - * All rights reserved. + * Copyright (C) 2008-2012 Sebastian Harl + * Copyright (C) 2009 Florian Forster * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: * - * - Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. * - * - Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. * * Authors: * Sebastian Harl - * Florian Forster + * Florian Forster **/ /* @@ -42,15 +36,21 @@ #include "configfile.h" #include "plugin.h" +#include "utils_cache.h" #include "utils_db_query.h" #include "utils_complain.h" +#if HAVE_PTHREAD_H +# include +#endif + #include #include #define log_err(...) ERROR ("postgresql: " __VA_ARGS__) #define log_warn(...) WARNING ("postgresql: " __VA_ARGS__) #define log_info(...) INFO ("postgresql: " __VA_ARGS__) +#define log_debug(...) DEBUG ("postgresql: " __VA_ARGS__) #ifndef C_PSQL_DEFAULT_CONF # define C_PSQL_DEFAULT_CONF PKGDATADIR "/postgresql_default.conf" @@ -95,6 +95,7 @@ typedef enum { C_PSQL_PARAM_DB, C_PSQL_PARAM_USER, C_PSQL_PARAM_INTERVAL, + C_PSQL_PARAM_INSTANCE, } c_psql_param_t; /* Parameter configuration. Stored as `user data' in the query objects. */ @@ -104,6 +105,12 @@ typedef struct { } c_psql_user_data_t; typedef struct { + char *name; + char *statement; + _Bool store_rates; +} c_psql_writer_t; + +typedef struct { PGconn *conn; c_complain_t conn_complaint; @@ -117,19 +124,33 @@ typedef struct { udb_query_t **queries; size_t queries_num; + c_psql_writer_t **writers; + size_t writers_num; + + /* make sure we don't access the database object in parallel */ + pthread_mutex_t db_lock; + cdtime_t interval; + /* writer "caching" settings */ + cdtime_t commit_interval; + cdtime_t next_commit; + char *host; char *port; char *database; char *user; char *password; + char *instance; + char *sslmode; char *krbsrvname; char *service; + + int ref_cnt; } c_psql_database_t; static char *def_queries[] = { @@ -143,19 +164,77 @@ static char *def_queries[] = { }; static int def_queries_num = STATIC_ARRAY_SIZE (def_queries); -static udb_query_t **queries = NULL; -static size_t queries_num = 0; +static c_psql_database_t **databases = NULL; +static size_t databases_num = 0; + +static udb_query_t **queries = NULL; +static size_t queries_num = 0; + +static c_psql_writer_t *writers = NULL; +static size_t writers_num = 0; + +static int c_psql_begin (c_psql_database_t *db) +{ + PGresult *r = PQexec (db->conn, "BEGIN"); + + int status = 1; + + if (r != NULL) { + if (PGRES_COMMAND_OK == PQresultStatus (r)) { + db->next_commit = cdtime() + db->commit_interval; + status = 0; + } + else + log_warn ("Failed to initiate ('BEGIN') transaction: %s", + PQerrorMessage (db->conn)); + PQclear (r); + } + return status; +} /* c_psql_begin */ + +static int c_psql_commit (c_psql_database_t *db) +{ + PGresult *r = PQexec (db->conn, "COMMIT"); + + int status = 1; + + if (r != NULL) { + if (PGRES_COMMAND_OK == PQresultStatus (r)) { + db->next_commit = 0; + log_debug ("Successfully committed transaction."); + status = 0; + } + else + log_warn ("Failed to commit transaction: %s", + PQerrorMessage (db->conn)); + PQclear (r); + } + return status; +} /* c_psql_commit */ static c_psql_database_t *c_psql_database_new (const char *name) { - c_psql_database_t *db; + c_psql_database_t **tmp; + c_psql_database_t *db; - db = (c_psql_database_t *)malloc (sizeof (*db)); + db = (c_psql_database_t *)malloc (sizeof(*db)); if (NULL == db) { log_err ("Out of memory."); return NULL; } + tmp = (c_psql_database_t **)realloc (databases, + (databases_num + 1) * sizeof (*databases)); + if (NULL == tmp) { + log_err ("Out of memory."); + sfree (db); + return NULL; + } + + databases = tmp; + databases[databases_num] = db; + ++databases_num; + db->conn = NULL; C_COMPLAIN_INIT (&db->conn_complaint); @@ -169,19 +248,31 @@ static c_psql_database_t *c_psql_database_new (const char *name) db->queries = NULL; db->queries_num = 0; + db->writers = NULL; + db->writers_num = 0; + + pthread_mutex_init (&db->db_lock, /* attrs = */ NULL); + db->interval = 0; + db->commit_interval = 0; + db->next_commit = 0; + db->database = sstrdup (name); db->host = NULL; db->port = NULL; db->user = NULL; db->password = NULL; + db->instance = sstrdup (name); + db->sslmode = NULL; db->krbsrvname = NULL; db->service = NULL; + + db->ref_cnt = 0; return db; } /* c_psql_database_new */ @@ -191,6 +282,17 @@ static void c_psql_database_delete (void *data) c_psql_database_t *db = data; + --db->ref_cnt; + /* readers and writers may access this database */ + if (db->ref_cnt > 0) + return; + + /* wait for the lock to be released by the last writer */ + pthread_mutex_lock (&db->db_lock); + + if (db->next_commit > 0) + c_psql_commit (db); + PQfinish (db->conn); db->conn = NULL; @@ -202,17 +304,31 @@ static void c_psql_database_delete (void *data) sfree (db->queries); db->queries_num = 0; + sfree (db->writers); + db->writers_num = 0; + + pthread_mutex_unlock (&db->db_lock); + + pthread_mutex_destroy (&db->db_lock); + sfree (db->database); sfree (db->host); sfree (db->port); sfree (db->user); sfree (db->password); + sfree (db->instance); + sfree (db->sslmode); sfree (db->krbsrvname); sfree (db->service); + + /* don't care about freeing or reordering the 'databases' array + * this is done in 'shutdown'; also, don't free the database instance + * object just to make sure that in case anybody accesses it before + * shutdown won't segfault */ return; } /* c_psql_database_delete */ @@ -223,7 +339,7 @@ static int c_psql_connect (c_psql_database_t *db) int buf_len = sizeof (conninfo); int status; - if (! db) + if ((! db) || (! db->database)) return -1; status = ssnprintf (buf, buf_len, "dbname = '%s'", db->database); @@ -259,9 +375,6 @@ static int c_psql_check_connection (c_psql_database_t *db) c_psql_connect (db); } - /* "ping" */ - PQclear (PQexec (db->conn, "SELECT 42;")); - if (CONNECTION_OK != PQstatus (db->conn)) { PQreset (db->conn); @@ -271,8 +384,9 @@ static int c_psql_check_connection (c_psql_database_t *db) if (CONNECTION_OK != PQstatus (db->conn)) { c_complain (LOG_ERR, &db->conn_complaint, - "Failed to connect to database %s: %s", - db->database, PQerrorMessage (db->conn)); + "Failed to connect to database %s (%s): %s", + db->database, db->instance, + PQerrorMessage (db->conn)); return -1; } @@ -337,9 +451,13 @@ static PGresult *c_psql_exec_query_params (c_psql_database_t *db, case C_PSQL_PARAM_INTERVAL: ssnprintf (interval, sizeof (interval), "%.3f", (db->interval > 0) - ? CDTIME_T_TO_DOUBLE (db->interval) : interval_g); + ? CDTIME_T_TO_DOUBLE (db->interval) + : plugin_get_interval ()); params[i] = interval; break; + case C_PSQL_PARAM_INSTANCE: + params[i] = db->instance; + break; default: assert (0); } @@ -351,6 +469,7 @@ static PGresult *c_psql_exec_query_params (c_psql_database_t *db, NULL, NULL, /* return text data */ 0); } /* c_psql_exec_query_params */ +/* db->db_lock must be locked when calling this function */ static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q, udb_query_preparation_area_t *prep_area) { @@ -377,30 +496,46 @@ static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q, else if ((NULL == data) || (0 == data->params_num)) res = c_psql_exec_query_noparams (db, q); else { - log_err ("Connection to database \"%s\" does not support parameters " - "(protocol version %d) - cannot execute query \"%s\".", - db->database, db->proto_version, + log_err ("Connection to database \"%s\" (%s) does not support " + "parameters (protocol version %d) - " + "cannot execute query \"%s\".", + db->database, db->instance, db->proto_version, udb_query_get_name (q)); return -1; } + /* give c_psql_write() a chance to acquire the lock if called recursively + * through dispatch_values(); this will happen if, both, queries and + * writers are configured for a single connection */ + pthread_mutex_unlock (&db->db_lock); + column_names = NULL; column_values = NULL; -#define BAIL_OUT(status) \ - sfree (column_names); \ - sfree (column_values); \ - PQclear (res); \ - return status - if (PGRES_TUPLES_OK != PQresultStatus (res)) { + pthread_mutex_lock (&db->db_lock); + + if ((CONNECTION_OK != PQstatus (db->conn)) + && (0 == c_psql_check_connection (db))) { + PQclear (res); + return c_psql_exec_query (db, q, prep_area); + } + log_err ("Failed to execute SQL query: %s", PQerrorMessage (db->conn)); log_info ("SQL query was: %s", udb_query_get_statement (q)); - BAIL_OUT (-1); + PQclear (res); + return -1; } +#define BAIL_OUT(status) \ + sfree (column_names); \ + sfree (column_values); \ + PQclear (res); \ + pthread_mutex_lock (&db->db_lock); \ + return status + rows_num = PQntuples (res); if (1 > rows_num) { BAIL_OUT (0); @@ -430,13 +565,14 @@ static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q, } if (C_PSQL_IS_UNIX_DOMAIN_SOCKET (db->host) + || (0 == strcmp (db->host, "127.0.0.1")) || (0 == strcmp (db->host, "localhost"))) host = hostname_g; else host = db->host; status = udb_query_prepare_result (q, prep_area, host, "postgresql", - db->database, column_names, (size_t) column_num, db->interval); + db->instance, column_names, (size_t) column_num, db->interval); if (0 != status) { log_err ("udb_query_prepare_result failed with status %i.", status); @@ -487,9 +623,15 @@ static int c_psql_read (user_data_t *ud) db = ud->data; assert (NULL != db->database); + assert (NULL != db->instance); + assert (NULL != db->queries); + + pthread_mutex_lock (&db->db_lock); - if (0 != c_psql_check_connection (db)) + if (0 != c_psql_check_connection (db)) { + pthread_mutex_unlock (&db->db_lock); return -1; + } for (i = 0; i < db->queries_num; ++i) { @@ -507,34 +649,384 @@ static int c_psql_read (user_data_t *ud) success = 1; } + pthread_mutex_unlock (&db->db_lock); + if (! success) return -1; return 0; } /* c_psql_read */ +static char *values_name_to_sqlarray (const data_set_t *ds, + char *string, size_t string_len) +{ + char *str_ptr; + size_t str_len; + + int i; + + str_ptr = string; + str_len = string_len; + + for (i = 0; i < ds->ds_num; ++i) { + int status = ssnprintf (str_ptr, str_len, ",'%s'", ds->ds[i].name); + + if (status < 1) + return NULL; + else if ((size_t)status >= str_len) { + str_len = 0; + break; + } + else { + str_ptr += status; + str_len -= (size_t)status; + } + } + + if (str_len <= 2) { + log_err ("c_psql_write: Failed to stringify value names"); + return NULL; + } + + /* overwrite the first comma */ + string[0] = '{'; + str_ptr[0] = '}'; + str_ptr[1] = '\0'; + + return string; +} /* values_name_to_sqlarray */ + +static char *values_type_to_sqlarray (const data_set_t *ds, + char *string, size_t string_len, _Bool store_rates) +{ + char *str_ptr; + size_t str_len; + + int i; + + str_ptr = string; + str_len = string_len; + + for (i = 0; i < ds->ds_num; ++i) { + int status; + + if (store_rates) + status = ssnprintf(str_ptr, str_len, ",'gauge'"); + else + status = ssnprintf(str_ptr, str_len, ",'%s'", + DS_TYPE_TO_STRING (ds->ds[i].type)); + + if (status < 1) { + str_len = 0; + break; + } + else if ((size_t)status >= str_len) { + str_len = 0; + break; + } + else { + str_ptr += status; + str_len -= (size_t)status; + } + } + + if (str_len <= 2) { + log_err ("c_psql_write: Failed to stringify value types"); + return NULL; + } + + /* overwrite the first comma */ + string[0] = '{'; + str_ptr[0] = '}'; + str_ptr[1] = '\0'; + + return string; +} /* values_type_to_sqlarray */ + +static char *values_to_sqlarray (const data_set_t *ds, const value_list_t *vl, + char *string, size_t string_len, _Bool store_rates) +{ + char *str_ptr; + size_t str_len; + + gauge_t *rates = NULL; + + int i; + + str_ptr = string; + str_len = string_len; + + for (i = 0; i < vl->values_len; ++i) { + int status = 0; + + if ((ds->ds[i].type != DS_TYPE_GAUGE) + && (ds->ds[i].type != DS_TYPE_COUNTER) + && (ds->ds[i].type != DS_TYPE_DERIVE) + && (ds->ds[i].type != DS_TYPE_ABSOLUTE)) { + log_err ("c_psql_write: Unknown data source type: %i", + ds->ds[i].type); + sfree (rates); + return NULL; + } + + if (ds->ds[i].type == DS_TYPE_GAUGE) + status = ssnprintf (str_ptr, str_len, + ",%f", vl->values[i].gauge); + else if (store_rates) { + if (rates == NULL) + rates = uc_get_rate (ds, vl); + + if (rates == NULL) { + log_err ("c_psql_write: Failed to determine rate"); + return NULL; + } + + status = ssnprintf (str_ptr, str_len, + ",%lf", rates[i]); + } + else if (ds->ds[i].type == DS_TYPE_COUNTER) + status = ssnprintf (str_ptr, str_len, + ",%llu", vl->values[i].counter); + else if (ds->ds[i].type == DS_TYPE_DERIVE) + status = ssnprintf (str_ptr, str_len, + ",%"PRIi64, vl->values[i].derive); + else if (ds->ds[i].type == DS_TYPE_ABSOLUTE) + status = ssnprintf (str_ptr, str_len, + ",%"PRIu64, vl->values[i].absolute); + + if (status < 1) { + str_len = 0; + break; + } + else if ((size_t)status >= str_len) { + str_len = 0; + break; + } + else { + str_ptr += status; + str_len -= (size_t)status; + } + } + + sfree (rates); + + if (str_len <= 2) { + log_err ("c_psql_write: Failed to stringify value list"); + return NULL; + } + + /* overwrite the first comma */ + string[0] = '{'; + str_ptr[0] = '}'; + str_ptr[1] = '\0'; + + return string; +} /* values_to_sqlarray */ + +static int c_psql_write (const data_set_t *ds, const value_list_t *vl, + user_data_t *ud) +{ + c_psql_database_t *db; + + char time_str[32]; + char values_name_str[1024]; + char values_type_str[1024]; + char values_str[1024]; + + const char *params[9]; + + int success = 0; + int i; + + if ((ud == NULL) || (ud->data == NULL)) { + log_err ("c_psql_write: Invalid user data."); + return -1; + } + + db = ud->data; + assert (db->database != NULL); + assert (db->writers != NULL); + + if (cdtime_to_iso8601 (time_str, sizeof (time_str), vl->time) == 0) { + log_err ("c_psql_write: Failed to convert time to ISO 8601 format"); + return -1; + } + + if (values_name_to_sqlarray (ds, + values_name_str, sizeof (values_name_str)) == NULL) + return -1; + +#define VALUE_OR_NULL(v) ((((v) == NULL) || (*(v) == '\0')) ? NULL : (v)) + + params[0] = time_str; + params[1] = vl->host; + params[2] = vl->plugin; + params[3] = VALUE_OR_NULL(vl->plugin_instance); + params[4] = vl->type; + params[5] = VALUE_OR_NULL(vl->type_instance); + params[6] = values_name_str; + +#undef VALUE_OR_NULL + + pthread_mutex_lock (&db->db_lock); + + if (0 != c_psql_check_connection (db)) { + pthread_mutex_unlock (&db->db_lock); + return -1; + } + + if ((db->commit_interval > 0) + && (db->next_commit == 0)) + c_psql_begin (db); + + for (i = 0; i < db->writers_num; ++i) { + c_psql_writer_t *writer; + PGresult *res; + + writer = db->writers[i]; + + if (values_type_to_sqlarray (ds, + values_type_str, sizeof (values_type_str), + writer->store_rates) == NULL) { + pthread_mutex_unlock (&db->db_lock); + return -1; + } + + if (values_to_sqlarray (ds, vl, + values_str, sizeof (values_str), + writer->store_rates) == NULL) { + pthread_mutex_unlock (&db->db_lock); + return -1; + } + + params[7] = values_type_str; + params[8] = values_str; + + res = PQexecParams (db->conn, writer->statement, + STATIC_ARRAY_SIZE (params), NULL, + (const char *const *)params, + NULL, NULL, /* return text data */ 0); + + if ((PGRES_COMMAND_OK != PQresultStatus (res)) + && (PGRES_TUPLES_OK != PQresultStatus (res))) { + PQclear (res); + + if ((CONNECTION_OK != PQstatus (db->conn)) + && (0 == c_psql_check_connection (db))) { + /* try again */ + res = PQexecParams (db->conn, writer->statement, + STATIC_ARRAY_SIZE (params), NULL, + (const char *const *)params, + NULL, NULL, /* return text data */ 0); + + if ((PGRES_COMMAND_OK == PQresultStatus (res)) + || (PGRES_TUPLES_OK == PQresultStatus (res))) { + PQclear (res); + success = 1; + continue; + } + } + + log_err ("Failed to execute SQL query: %s", + PQerrorMessage (db->conn)); + log_info ("SQL query was: '%s', " + "params: %s, %s, %s, %s, %s, %s, %s, %s", + writer->statement, + params[0], params[1], params[2], params[3], + params[4], params[5], params[6], params[7]); + + /* this will abort any current transaction -> restart */ + if (db->next_commit > 0) + c_psql_commit (db); + + pthread_mutex_unlock (&db->db_lock); + return -1; + } + + PQclear (res); + success = 1; + } + + if ((db->next_commit > 0) + && (cdtime () > db->next_commit)) + c_psql_commit (db); + + pthread_mutex_unlock (&db->db_lock); + + if (! success) + return -1; + return 0; +} /* c_psql_write */ + +/* We cannot flush single identifiers as all we do is to commit the currently + * running transaction, thus making sure that all written data is actually + * visible to everybody. */ +static int c_psql_flush (cdtime_t timeout, + __attribute__((unused)) const char *ident, + user_data_t *ud) +{ + c_psql_database_t **dbs = databases; + size_t dbs_num = databases_num; + size_t i; + + if ((ud != NULL) && (ud->data != NULL)) { + dbs = (void *)&ud->data; + dbs_num = 1; + } + + for (i = 0; i < dbs_num; ++i) { + c_psql_database_t *db = dbs[i]; + + /* don't commit if the timeout is larger than the regular commit + * interval as in that case all requested data has already been + * committed */ + if ((db->next_commit > 0) && (db->commit_interval > timeout)) + c_psql_commit (db); + } + return 0; +} /* c_psql_flush */ + static int c_psql_shutdown (void) { + size_t i = 0; + + _Bool had_flush = 0; + plugin_unregister_read_group ("postgresql"); + for (i = 0; i < databases_num; ++i) { + c_psql_database_t *db = databases[i]; + + if (db->writers_num > 0) { + char cb_name[DATA_MAX_NAME_LEN]; + ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s", + db->database); + + if (! had_flush) { + plugin_unregister_flush ("postgresql"); + had_flush = 1; + } + + plugin_unregister_flush (cb_name); + plugin_unregister_write (cb_name); + } + + sfree (db); + } + udb_query_free (queries, queries_num); queries = NULL; queries_num = 0; - return 0; -} /* c_psql_shutdown */ + sfree (writers); + writers = NULL; + writers_num = 0; -static int config_set_s (char *name, char **var, const oconfig_item_t *ci) -{ - if ((0 != ci->children_num) || (1 != ci->values_num) - || (OCONFIG_TYPE_STRING != ci->values[0].type)) { - log_err ("%s expects a single string argument.", name); - return 1; - } + sfree (databases); + databases = NULL; + databases_num = 0; - sfree (*var); - *var = sstrdup (ci->values[0].value.string); return 0; -} /* config_set_s */ +} /* c_psql_shutdown */ static int config_query_param_add (udb_query_t *q, oconfig_item_t *ci) { @@ -571,6 +1063,8 @@ static int config_query_param_add (udb_query_t *q, oconfig_item_t *ci) data->params[data->params_num] = C_PSQL_PARAM_USER; else if (0 == strcasecmp (param_str, "interval")) data->params[data->params_num] = C_PSQL_PARAM_INTERVAL; + else if (0 == strcasecmp (param_str, "instance")) + data->params[data->params_num] = C_PSQL_PARAM_INSTANCE; else { log_err ("Invalid parameter \"%s\".", param_str); return 1; @@ -592,6 +1086,103 @@ static int config_query_callback (udb_query_t *q, oconfig_item_t *ci) return (-1); } /* config_query_callback */ +static int config_add_writer (oconfig_item_t *ci, + c_psql_writer_t *src_writers, size_t src_writers_num, + c_psql_writer_t ***dst_writers, size_t *dst_writers_num) +{ + char *name; + + size_t i; + + if ((ci == NULL) || (dst_writers == NULL) || (dst_writers_num == NULL)) + return -1; + + if ((ci->values_num != 1) + || (ci->values[0].type != OCONFIG_TYPE_STRING)) { + log_err ("`Writer' expects a single string argument."); + return 1; + } + + name = ci->values[0].value.string; + + for (i = 0; i < src_writers_num; ++i) { + c_psql_writer_t **tmp; + + if (strcasecmp (name, src_writers[i].name) != 0) + continue; + + tmp = (c_psql_writer_t **)realloc (*dst_writers, + sizeof (**dst_writers) * (*dst_writers_num + 1)); + if (tmp == NULL) { + log_err ("Out of memory."); + return -1; + } + + tmp[*dst_writers_num] = src_writers + i; + + *dst_writers = tmp; + ++(*dst_writers_num); + break; + } + + if (i >= src_writers_num) { + log_err ("No such writer: `%s'", name); + return -1; + } + + return 0; +} /* config_add_writer */ + +static int c_psql_config_writer (oconfig_item_t *ci) +{ + c_psql_writer_t *writer; + c_psql_writer_t *tmp; + + int status = 0; + int i; + + if ((ci->values_num != 1) + || (ci->values[0].type != OCONFIG_TYPE_STRING)) { + log_err (" expects a single string argument."); + return 1; + } + + tmp = (c_psql_writer_t *)realloc (writers, + sizeof (*writers) * (writers_num + 1)); + if (tmp == NULL) { + log_err ("Out of memory."); + return -1; + } + + writers = tmp; + writer = writers + writers_num; + ++writers_num; + + writer->name = sstrdup (ci->values[0].value.string); + writer->statement = NULL; + writer->store_rates = 1; + + for (i = 0; i < ci->children_num; ++i) { + oconfig_item_t *c = ci->children + i; + + if (strcasecmp ("Statement", c->key) == 0) + status = cf_util_get_string (c, &writer->statement); + else if (strcasecmp ("StoreRates", c->key) == 0) + status = cf_util_get_boolean (c, &writer->store_rates); + else + log_warn ("Ignoring unknown config key \"%s\".", c->key); + } + + if (status != 0) { + sfree (writer->statement); + sfree (writer->name); + sfree (writer); + return status; + } + + return 0; +} /* c_psql_config_writer */ + static int c_psql_config_database (oconfig_item_t *ci) { c_psql_database_t *db; @@ -600,6 +1191,8 @@ static int c_psql_config_database (oconfig_item_t *ci) struct timespec cb_interval = { 0, 0 }; user_data_t ud; + static _Bool have_flush = 0; + int i; if ((1 != ci->values_num) @@ -618,30 +1211,37 @@ static int c_psql_config_database (oconfig_item_t *ci) oconfig_item_t *c = ci->children + i; if (0 == strcasecmp (c->key, "Host")) - config_set_s ("Host", &db->host, c); + cf_util_get_string (c, &db->host); else if (0 == strcasecmp (c->key, "Port")) - config_set_s ("Port", &db->port, c); + cf_util_get_service (c, &db->port); else if (0 == strcasecmp (c->key, "User")) - config_set_s ("User", &db->user, c); + cf_util_get_string (c, &db->user); else if (0 == strcasecmp (c->key, "Password")) - config_set_s ("Password", &db->password, c); + cf_util_get_string (c, &db->password); + else if (0 == strcasecmp (c->key, "Instance")) + cf_util_get_string (c, &db->instance); else if (0 == strcasecmp (c->key, "SSLMode")) - config_set_s ("SSLMode", &db->sslmode, c); + cf_util_get_string (c, &db->sslmode); else if (0 == strcasecmp (c->key, "KRBSrvName")) - config_set_s ("KRBSrvName", &db->krbsrvname, c); + cf_util_get_string (c, &db->krbsrvname); else if (0 == strcasecmp (c->key, "Service")) - config_set_s ("Service", &db->service, c); + cf_util_get_string (c, &db->service); else if (0 == strcasecmp (c->key, "Query")) udb_query_pick_from_list (c, queries, queries_num, &db->queries, &db->queries_num); + else if (0 == strcasecmp (c->key, "Writer")) + config_add_writer (c, writers, writers_num, + &db->writers, &db->writers_num); else if (0 == strcasecmp (c->key, "Interval")) cf_util_get_cdtime (c, &db->interval); + else if (strcasecmp ("CommitInterval", c->key) == 0) + cf_util_get_cdtime (c, &db->commit_interval); else log_warn ("Ignoring unknown config key \"%s\".", c->key); } /* If no `Query' options were given, add the default queries.. */ - if (db->queries_num == 0) { + if ((db->queries_num == 0) && (db->writers_num == 0)){ for (i = 0; i < def_queries_num; i++) udb_query_pick_from_list_by_name (def_queries[i], queries, queries_num, @@ -678,13 +1278,36 @@ static int c_psql_config_database (oconfig_item_t *ci) ud.data = db; ud.free_func = c_psql_database_delete; - ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s", db->database); + ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s", db->instance); + + if (db->queries_num > 0) { + CDTIME_T_TO_TIMESPEC (db->interval, &cb_interval); - CDTIME_T_TO_TIMESPEC (db->interval, &cb_interval); + ++db->ref_cnt; + plugin_register_complex_read ("postgresql", cb_name, c_psql_read, + /* interval = */ (db->interval > 0) ? &cb_interval : NULL, + &ud); + } + if (db->writers_num > 0) { + ++db->ref_cnt; + plugin_register_write (cb_name, c_psql_write, &ud); + + if (! have_flush) { + /* flush all */ + plugin_register_flush ("postgresql", + c_psql_flush, /* user data = */ NULL); + have_flush = 1; + } - plugin_register_complex_read ("postgresql", cb_name, c_psql_read, - /* interval = */ (db->interval > 0) ? &cb_interval : NULL, - &ud); + /* flush this connection only */ + ++db->ref_cnt; + plugin_register_flush (cb_name, c_psql_flush, &ud); + } + else if (db->commit_interval > 0) { + log_warn ("Database '%s': You do not have any writers assigned to " + "this database connection. Setting 'CommitInterval' does " + "not have any effect.", db->database); + } return 0; } /* c_psql_config_database */ @@ -716,6 +1339,8 @@ static int c_psql_config (oconfig_item_t *ci) if (0 == strcasecmp (c->key, "Query")) udb_query_create (&queries, &queries_num, c, /* callback = */ config_query_callback); + else if (0 == strcasecmp (c->key, "Writer")) + c_psql_config_writer (c); else if (0 == strcasecmp (c->key, "Database")) c_psql_config_database (c); else