X-Git-Url: https://git.octo.it/?p=collectd.git;a=blobdiff_plain;f=src%2Fpostgresql.c;h=635abfca61bf5647b4d53cea55601a5182a8997e;hp=8a9fa10186fa6ae102435cd9f0c1cd9996b0ccde;hb=633c3966f770e4d46651a2fe219a18d8a9907a9f;hpb=f1afdb3de66275f48ca99ec57dafe4ac8b52bf97 diff --git a/src/postgresql.c b/src/postgresql.c index 8a9fa101..635abfca 100644 --- a/src/postgresql.c +++ b/src/postgresql.c @@ -2,34 +2,28 @@ * collectd - src/postgresql.c * Copyright (C) 2008-2012 Sebastian Harl * Copyright (C) 2009 Florian Forster - * All rights reserved. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: * - * - Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. * - * - Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. * * Authors: * Sebastian Harl - * Florian Forster + * Florian Forster **/ /* @@ -101,6 +95,7 @@ typedef enum { C_PSQL_PARAM_DB, C_PSQL_PARAM_USER, C_PSQL_PARAM_INTERVAL, + C_PSQL_PARAM_INSTANCE, } c_psql_param_t; /* Parameter configuration. Stored as `user data' in the query objects. */ @@ -147,11 +142,15 @@ typedef struct { char *user; char *password; + char *instance; + char *sslmode; char *krbsrvname; char *service; + + int ref_cnt; } c_psql_database_t; static char *def_queries[] = { @@ -165,22 +164,77 @@ static char *def_queries[] = { }; static int def_queries_num = STATIC_ARRAY_SIZE (def_queries); -static udb_query_t **queries = NULL; -static size_t queries_num = 0; +static c_psql_database_t **databases = NULL; +static size_t databases_num = 0; + +static udb_query_t **queries = NULL; +static size_t queries_num = 0; + +static c_psql_writer_t *writers = NULL; +static size_t writers_num = 0; + +static int c_psql_begin (c_psql_database_t *db) +{ + PGresult *r = PQexec (db->conn, "BEGIN"); -static c_psql_writer_t *writers = NULL; -static size_t writers_num = 0; + int status = 1; + + if (r != NULL) { + if (PGRES_COMMAND_OK == PQresultStatus (r)) { + db->next_commit = cdtime() + db->commit_interval; + status = 0; + } + else + log_warn ("Failed to initiate ('BEGIN') transaction: %s", + PQerrorMessage (db->conn)); + PQclear (r); + } + return status; +} /* c_psql_begin */ + +static int c_psql_commit (c_psql_database_t *db) +{ + PGresult *r = PQexec (db->conn, "COMMIT"); + + int status = 1; + + if (r != NULL) { + if (PGRES_COMMAND_OK == PQresultStatus (r)) { + db->next_commit = 0; + log_debug ("Successfully committed transaction."); + status = 0; + } + else + log_warn ("Failed to commit transaction: %s", + PQerrorMessage (db->conn)); + PQclear (r); + } + return status; +} /* c_psql_commit */ static c_psql_database_t *c_psql_database_new (const char *name) { - c_psql_database_t *db; + c_psql_database_t **tmp; + c_psql_database_t *db; - db = (c_psql_database_t *)malloc (sizeof (*db)); + db = (c_psql_database_t *)malloc (sizeof(*db)); if (NULL == db) { log_err ("Out of memory."); return NULL; } + tmp = (c_psql_database_t **)realloc (databases, + (databases_num + 1) * sizeof (*databases)); + if (NULL == tmp) { + log_err ("Out of memory."); + sfree (db); + return NULL; + } + + databases = tmp; + databases[databases_num] = db; + ++databases_num; + db->conn = NULL; C_COMPLAIN_INIT (&db->conn_complaint); @@ -210,11 +264,15 @@ static c_psql_database_t *c_psql_database_new (const char *name) db->user = NULL; db->password = NULL; + db->instance = sstrdup (name); + db->sslmode = NULL; db->krbsrvname = NULL; db->service = NULL; + + db->ref_cnt = 0; return db; } /* c_psql_database_new */ @@ -224,6 +282,17 @@ static void c_psql_database_delete (void *data) c_psql_database_t *db = data; + --db->ref_cnt; + /* readers and writers may access this database */ + if (db->ref_cnt > 0) + return; + + /* wait for the lock to be released by the last writer */ + pthread_mutex_lock (&db->db_lock); + + if (db->next_commit > 0) + c_psql_commit (db); + PQfinish (db->conn); db->conn = NULL; @@ -238,8 +307,6 @@ static void c_psql_database_delete (void *data) sfree (db->writers); db->writers_num = 0; - /* wait for the lock to be released by the last writer */ - pthread_mutex_lock (&db->db_lock); pthread_mutex_unlock (&db->db_lock); pthread_mutex_destroy (&db->db_lock); @@ -250,11 +317,18 @@ static void c_psql_database_delete (void *data) sfree (db->user); sfree (db->password); + sfree (db->instance); + sfree (db->sslmode); sfree (db->krbsrvname); sfree (db->service); + + /* don't care about freeing or reordering the 'databases' array + * this is done in 'shutdown'; also, don't free the database instance + * object just to make sure that in case anybody accesses it before + * shutdown won't segfault */ return; } /* c_psql_database_delete */ @@ -265,7 +339,7 @@ static int c_psql_connect (c_psql_database_t *db) int buf_len = sizeof (conninfo); int status; - if (! db) + if ((! db) || (! db->database)) return -1; status = ssnprintf (buf, buf_len, "dbname = '%s'", db->database); @@ -301,9 +375,6 @@ static int c_psql_check_connection (c_psql_database_t *db) c_psql_connect (db); } - /* "ping" */ - PQclear (PQexec (db->conn, "SELECT 42;")); - if (CONNECTION_OK != PQstatus (db->conn)) { PQreset (db->conn); @@ -313,8 +384,9 @@ static int c_psql_check_connection (c_psql_database_t *db) if (CONNECTION_OK != PQstatus (db->conn)) { c_complain (LOG_ERR, &db->conn_complaint, - "Failed to connect to database %s: %s", - db->database, PQerrorMessage (db->conn)); + "Failed to connect to database %s (%s): %s", + db->database, db->instance, + PQerrorMessage (db->conn)); return -1; } @@ -379,9 +451,13 @@ static PGresult *c_psql_exec_query_params (c_psql_database_t *db, case C_PSQL_PARAM_INTERVAL: ssnprintf (interval, sizeof (interval), "%.3f", (db->interval > 0) - ? CDTIME_T_TO_DOUBLE (db->interval) : interval_g); + ? CDTIME_T_TO_DOUBLE (db->interval) + : plugin_get_interval ()); params[i] = interval; break; + case C_PSQL_PARAM_INSTANCE: + params[i] = db->instance; + break; default: assert (0); } @@ -420,9 +496,10 @@ static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q, else if ((NULL == data) || (0 == data->params_num)) res = c_psql_exec_query_noparams (db, q); else { - log_err ("Connection to database \"%s\" does not support parameters " - "(protocol version %d) - cannot execute query \"%s\".", - db->database, db->proto_version, + log_err ("Connection to database \"%s\" (%s) does not support " + "parameters (protocol version %d) - " + "cannot execute query \"%s\".", + db->database, db->instance, db->proto_version, udb_query_get_name (q)); return -1; } @@ -438,6 +515,12 @@ static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q, if (PGRES_TUPLES_OK != PQresultStatus (res)) { pthread_mutex_lock (&db->db_lock); + if ((CONNECTION_OK != PQstatus (db->conn)) + && (0 == c_psql_check_connection (db))) { + PQclear (res); + return c_psql_exec_query (db, q, prep_area); + } + log_err ("Failed to execute SQL query: %s", PQerrorMessage (db->conn)); log_info ("SQL query was: %s", @@ -482,13 +565,14 @@ static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q, } if (C_PSQL_IS_UNIX_DOMAIN_SOCKET (db->host) + || (0 == strcmp (db->host, "127.0.0.1")) || (0 == strcmp (db->host, "localhost"))) host = hostname_g; else host = db->host; status = udb_query_prepare_result (q, prep_area, host, "postgresql", - db->database, column_names, (size_t) column_num, db->interval); + db->instance, column_names, (size_t) column_num, db->interval); if (0 != status) { log_err ("udb_query_prepare_result failed with status %i.", status); @@ -539,6 +623,7 @@ static int c_psql_read (user_data_t *ud) db = ud->data; assert (NULL != db->database); + assert (NULL != db->instance); assert (NULL != db->queries); pthread_mutex_lock (&db->db_lock); @@ -737,45 +822,6 @@ static char *values_to_sqlarray (const data_set_t *ds, const value_list_t *vl, return string; } /* values_to_sqlarray */ -static int c_psql_begin (c_psql_database_t *db) -{ - PGresult *r = PQexec (db->conn, "BEGIN"); - - int status = 1; - - if (r != NULL) { - if (PGRES_COMMAND_OK == PQresultStatus (r)) { - db->next_commit = cdtime() + db->commit_interval; - status = 0; - } - else - log_warn ("Failed to initiate ('BEGIN') transaction: %s", - PQerrorMessage (db->conn)); - PQclear (r); - } - return status; -} /* c_psql_begin */ - -static int c_psql_commit (c_psql_database_t *db) -{ - PGresult *r = PQexec (db->conn, "COMMIT"); - - int status = 1; - - if (r != NULL) { - if (PGRES_COMMAND_OK == PQresultStatus (r)) { - db->next_commit = cdtime () + db->commit_interval; - log_debug ("Successfully committed transaction."); - status = 0; - } - else - log_warn ("Failed to commit transaction: %s", - PQerrorMessage (db->conn)); - PQclear (r); - } - return status; -} /* c_psql_commit */ - static int c_psql_write (const data_set_t *ds, const value_list_t *vl, user_data_t *ud) { @@ -862,10 +908,10 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl, if ((PGRES_COMMAND_OK != PQresultStatus (res)) && (PGRES_TUPLES_OK != PQresultStatus (res))) { + PQclear (res); + if ((CONNECTION_OK != PQstatus (db->conn)) && (0 == c_psql_check_connection (db))) { - PQclear (res); - /* try again */ res = PQexecParams (db->conn, writer->statement, STATIC_ARRAY_SIZE (params), NULL, @@ -874,6 +920,7 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl, if ((PGRES_COMMAND_OK == PQresultStatus (res)) || (PGRES_TUPLES_OK == PQresultStatus (res))) { + PQclear (res); success = 1; continue; } @@ -888,20 +935,20 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl, params[4], params[5], params[6], params[7]); /* this will abort any current transaction -> restart */ - if (db->commit_interval > 0) - if (c_psql_commit (db) == 0) - c_psql_begin (db); + if (db->next_commit > 0) + c_psql_commit (db); pthread_mutex_unlock (&db->db_lock); return -1; } + + PQclear (res); success = 1; } if ((db->next_commit > 0) && (cdtime () > db->next_commit)) - if (c_psql_commit (db) == 0) - c_psql_begin (db); + c_psql_commit (db); pthread_mutex_unlock (&db->db_lock); @@ -910,10 +957,62 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl, return 0; } /* c_psql_write */ +/* We cannot flush single identifiers as all we do is to commit the currently + * running transaction, thus making sure that all written data is actually + * visible to everybody. */ +static int c_psql_flush (cdtime_t timeout, + __attribute__((unused)) const char *ident, + user_data_t *ud) +{ + c_psql_database_t **dbs = databases; + size_t dbs_num = databases_num; + size_t i; + + if ((ud != NULL) && (ud->data != NULL)) { + dbs = (void *)&ud->data; + dbs_num = 1; + } + + for (i = 0; i < dbs_num; ++i) { + c_psql_database_t *db = dbs[i]; + + /* don't commit if the timeout is larger than the regular commit + * interval as in that case all requested data has already been + * committed */ + if ((db->next_commit > 0) && (db->commit_interval > timeout)) + c_psql_commit (db); + } + return 0; +} /* c_psql_flush */ + static int c_psql_shutdown (void) { + size_t i = 0; + + _Bool had_flush = 0; + plugin_unregister_read_group ("postgresql"); + for (i = 0; i < databases_num; ++i) { + c_psql_database_t *db = databases[i]; + + if (db->writers_num > 0) { + char cb_name[DATA_MAX_NAME_LEN]; + ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s", + db->database); + + if (! had_flush) { + plugin_unregister_flush ("postgresql"); + had_flush = 1; + } + + plugin_unregister_flush (cb_name); + plugin_unregister_write (cb_name); + } + + sfree (db); + } + udb_query_free (queries, queries_num); queries = NULL; queries_num = 0; @@ -922,6 +1021,10 @@ static int c_psql_shutdown (void) writers = NULL; writers_num = 0; + sfree (databases); + databases = NULL; + databases_num = 0; + return 0; } /* c_psql_shutdown */ @@ -960,6 +1063,8 @@ static int config_query_param_add (udb_query_t *q, oconfig_item_t *ci) data->params[data->params_num] = C_PSQL_PARAM_USER; else if (0 == strcasecmp (param_str, "interval")) data->params[data->params_num] = C_PSQL_PARAM_INTERVAL; + else if (0 == strcasecmp (param_str, "instance")) + data->params[data->params_num] = C_PSQL_PARAM_INSTANCE; else { log_err ("Invalid parameter \"%s\".", param_str); return 1; @@ -1086,6 +1191,8 @@ static int c_psql_config_database (oconfig_item_t *ci) struct timespec cb_interval = { 0, 0 }; user_data_t ud; + static _Bool have_flush = 0; + int i; if ((1 != ci->values_num) @@ -1111,6 +1218,8 @@ static int c_psql_config_database (oconfig_item_t *ci) cf_util_get_string (c, &db->user); else if (0 == strcasecmp (c->key, "Password")) cf_util_get_string (c, &db->password); + else if (0 == strcasecmp (c->key, "Instance")) + cf_util_get_string (c, &db->instance); else if (0 == strcasecmp (c->key, "SSLMode")) cf_util_get_string (c, &db->sslmode); else if (0 == strcasecmp (c->key, "KRBSrvName")) @@ -1169,17 +1278,30 @@ static int c_psql_config_database (oconfig_item_t *ci) ud.data = db; ud.free_func = c_psql_database_delete; - ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s", db->database); + ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s", db->instance); if (db->queries_num > 0) { CDTIME_T_TO_TIMESPEC (db->interval, &cb_interval); + ++db->ref_cnt; plugin_register_complex_read ("postgresql", cb_name, c_psql_read, /* interval = */ (db->interval > 0) ? &cb_interval : NULL, &ud); } if (db->writers_num > 0) { + ++db->ref_cnt; plugin_register_write (cb_name, c_psql_write, &ud); + + if (! have_flush) { + /* flush all */ + plugin_register_flush ("postgresql", + c_psql_flush, /* user data = */ NULL); + have_flush = 1; + } + + /* flush this connection only */ + ++db->ref_cnt; + plugin_register_flush (cb_name, c_psql_flush, &ud); } else if (db->commit_interval > 0) { log_warn ("Database '%s': You do not have any writers assigned to "